diff --git a/src/nanoarrow/device/cuda.c b/src/nanoarrow/device/cuda.c index 56ff4dfd6..d2db25a8c 100644 --- a/src/nanoarrow/device/cuda.c +++ b/src/nanoarrow/device/cuda.c @@ -100,9 +100,10 @@ static void ArrowDeviceCudaDeallocator(struct ArrowBufferAllocator* allocator, ArrowFree(allocator_private); } -static ArrowErrorCode ArrowDeviceCudaAllocateBuffer(struct ArrowDevice* device, - struct ArrowBuffer* buffer, - int64_t size_bytes) { +static ArrowErrorCode ArrowDeviceCudaAllocateBufferAsync(struct ArrowDevice* device, + struct ArrowBuffer* buffer, + int64_t size_bytes, + CUstream hstream) { struct ArrowDeviceCudaPrivate* private_data = (struct ArrowDeviceCudaPrivate*)device->private_data; @@ -124,11 +125,14 @@ static ArrowErrorCode ArrowDeviceCudaAllocateBuffer(struct ArrowDevice* device, switch (device->device_type) { case ARROW_DEVICE_CUDA: { CUdeviceptr dptr = 0; - if (size_bytes > 0) { // cuMemalloc requires non-zero size_bytes - err = cuMemAlloc(&dptr, (size_t)size_bytes); + + // cuMemalloc requires non-zero size_bytes + if (size_bytes > 0) { + err = cuMemAllocAsync(&dptr, (size_t)size_bytes, hstream); } else { err = CUDA_SUCCESS; } + ptr = (void*)dptr; op = "cuMemAlloc"; break; @@ -181,51 +185,105 @@ static void ArrowDeviceCudaArrayRelease(struct ArrowArray* array) { array->release = NULL; } -static ArrowErrorCode ArrowDeviceCudaArrayInit(struct ArrowDevice* device, - struct ArrowDeviceArray* device_array, - struct ArrowArray* array, - void* sync_event) { +static ArrowErrorCode ArrowDeviceCudaArrayInitInternal( + struct ArrowDevice* device, struct ArrowDeviceArray* device_array, + struct ArrowArray* array, CUevent cu_event) { struct ArrowDeviceCudaPrivate* device_private = (struct ArrowDeviceCudaPrivate*)device->private_data; - // One can create an event with cuEventCreate(&cu_event, CU_EVENT_DEFAULT); - // Requires cuCtxPushCurrent() + cuEventCreate() + cuCtxPopCurrent() - struct ArrowDeviceCudaArrayPrivate* private_data = + struct ArrowDeviceCudaArrayPrivate* array_private = (struct ArrowDeviceCudaArrayPrivate*)ArrowMalloc( sizeof(struct ArrowDeviceCudaArrayPrivate)); - if (private_data == NULL) { + if (array_private == NULL) { return ENOMEM; } memset(device_array, 0, sizeof(struct ArrowDeviceArray)); device_array->array = *array; - device_array->array.private_data = private_data; + device_array->array.private_data = array_private; device_array->array.release = &ArrowDeviceCudaArrayRelease; - ArrowArrayMove(array, &private_data->parent); + ArrowArrayMove(array, &array_private->parent); device_array->device_id = device->device_id; device_array->device_type = device->device_type; - if (sync_event != NULL) { - private_data->cu_event = *((CUevent*)sync_event); - device_array->sync_event = sync_event; + if (cu_event != NULL) { + array_private->cu_event = cu_event; + device_array->sync_event = &array_private->cu_event; } else { - private_data->cu_event = NULL; + array_private->cu_event = NULL; device_array->sync_event = NULL; } return NANOARROW_OK; } -// TODO: All these buffer copiers would benefit from cudaMemcpyAsync but there is -// no good way to incorporate that just yet +static ArrowErrorCode ArrowDeviceCudaArrayInitAsync(struct ArrowDevice* device, + struct ArrowDeviceArray* device_array, + struct ArrowArray* array, + void* sync_event, void* stream) { + struct ArrowDeviceCudaPrivate* private_data = + (struct ArrowDeviceCudaPrivate*)device->private_data; + + NANOARROW_CUDA_RETURN_NOT_OK(cuCtxPushCurrent(private_data->cu_context), + "cuCtxPushCurrent", NULL); + CUcontext unused; // needed for cuCtxPopCurrent() + + CUevent cu_event; + if (sync_event == NULL) { + cu_event = NULL; + } else { + cu_event = *((CUevent*)sync_event); + } + + // If the stream was passed, it means that we are required to ensure that + // the event that is exported by the output array captures the work that + // has been done on stream. If we were not given an event to take ownership of, + // this means we need to create one. + CUevent cu_event_tmp = NULL; + CUresult err; + + if (stream != NULL && cu_event == NULL) { + // Event is faster with timing disabled (a user can provide their + // own event if they want timing enabled) + err = cuEventCreate(&cu_event_tmp, CU_EVENT_DISABLE_TIMING); + if (err != CUDA_SUCCESS) { + NANOARROW_CUDA_ASSERT_OK(cuCtxPopCurrent(&unused)); + NANOARROW_CUDA_RETURN_NOT_OK(err, "cuEventCreate", NULL); + } + + cu_event = cu_event_tmp; + } + + if (stream != NULL) { + err = cuEventRecord(cu_event, *((CUstream*)stream)); + if (err != CUDA_SUCCESS) { + NANOARROW_CUDA_ASSERT_OK(cuCtxPopCurrent(&unused)); + if (cu_event_tmp != NULL) { + NANOARROW_CUDA_ASSERT_OK(cuEventDestroy(cu_event_tmp)); + } + + NANOARROW_CUDA_RETURN_NOT_OK(err, "cuEventCreate", NULL); + } + } -static ArrowErrorCode ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* device_src, - struct ArrowBufferView src, - struct ArrowDevice* device_dst, - struct ArrowBufferView dst, - int* n_pop_context, - struct ArrowError* error) { + int result = ArrowDeviceCudaArrayInitInternal(device, device_array, array, cu_event); + NANOARROW_CUDA_ASSERT_OK(cuCtxPopCurrent(&unused)); + if (result != NANOARROW_OK) { + if (cu_event_tmp != NULL) { + NANOARROW_CUDA_ASSERT_OK(cuEventDestroy(cu_event_tmp)); + } + + return result; + } + + return NANOARROW_OK; +} + +static ArrowErrorCode ArrowDeviceCudaBufferCopyAsyncInternal( + struct ArrowDevice* device_src, struct ArrowBufferView src, + struct ArrowDevice* device_dst, struct ArrowBufferView dst, int* n_pop_context, + struct ArrowError* error, CUstream hstream) { // Note: the device_src/sync event must be synchronized before calling these methods, // even though the cuMemcpyXXX() functions may do this automatically in some cases. @@ -238,7 +296,8 @@ static ArrowErrorCode ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* devi (*n_pop_context)++; NANOARROW_CUDA_RETURN_NOT_OK( - cuMemcpyHtoD((CUdeviceptr)dst.data.data, src.data.data, (size_t)src.size_bytes), + cuMemcpyHtoDAsync((CUdeviceptr)dst.data.data, src.data.data, + (size_t)src.size_bytes, hstream), "cuMemcpyHtoD", error); } else if (device_src->device_type == ARROW_DEVICE_CUDA && @@ -252,9 +311,9 @@ static ArrowErrorCode ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* devi (*n_pop_context)++; NANOARROW_CUDA_RETURN_NOT_OK( - cuMemcpyDtoD((CUdeviceptr)dst.data.data, (CUdeviceptr)src.data.data, - (size_t)src.size_bytes), - "cuMemcpytoD", error); + cuMemcpyDtoDAsync((CUdeviceptr)dst.data.data, (CUdeviceptr)src.data.data, + (size_t)src.size_bytes, hstream), + "cuMemcpyDtoDAsync", error); } else if (device_src->device_type == ARROW_DEVICE_CUDA && device_dst->device_type == ARROW_DEVICE_CUDA) { @@ -264,10 +323,10 @@ static ArrowErrorCode ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* devi (struct ArrowDeviceCudaPrivate*)device_dst->private_data; NANOARROW_CUDA_RETURN_NOT_OK( - cuMemcpyPeer((CUdeviceptr)dst.data.data, dst_private->cu_context, - (CUdeviceptr)src.data.data, src_private->cu_context, - (size_t)src.size_bytes), - "cuMemcpyPeer", error); + cuMemcpyPeerAsync((CUdeviceptr)dst.data.data, dst_private->cu_context, + (CUdeviceptr)src.data.data, src_private->cu_context, + (size_t)src.size_bytes, hstream), + "cuMemcpyPeerAsync", error); } else if (device_src->device_type == ARROW_DEVICE_CUDA && device_dst->device_type == ARROW_DEVICE_CPU) { @@ -278,9 +337,9 @@ static ArrowErrorCode ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* devi "cuCtxPushCurrent", error); (*n_pop_context)++; NANOARROW_CUDA_RETURN_NOT_OK( - cuMemcpyDtoH((void*)dst.data.data, (CUdeviceptr)src.data.data, - (size_t)src.size_bytes), - "cuMemcpyDtoH", error); + cuMemcpyDtoHAsync((void*)dst.data.data, (CUdeviceptr)src.data.data, + (size_t)src.size_bytes, hstream), + "cuMemcpyDtoHAsync", error); } else if (device_src->device_type == ARROW_DEVICE_CPU && device_dst->device_type == ARROW_DEVICE_CUDA_HOST) { @@ -301,15 +360,22 @@ static ArrowErrorCode ArrowDeviceCudaBufferCopyInternal(struct ArrowDevice* devi return NANOARROW_OK; } -static ArrowErrorCode ArrowDeviceCudaBufferCopy(struct ArrowDevice* device_src, - struct ArrowBufferView src, - struct ArrowDevice* device_dst, - struct ArrowBufferView dst) { +static ArrowErrorCode ArrowDeviceCudaBufferCopyAsync(struct ArrowDevice* device_src, + struct ArrowBufferView src, + struct ArrowDevice* device_dst, + struct ArrowBufferView dst, + void* stream) { + if (stream == NULL) { + return EINVAL; + } + + CUstream hstream = *((CUstream*)stream); + int n_pop_context = 0; struct ArrowError error; - int result = ArrowDeviceCudaBufferCopyInternal(device_src, src, device_dst, dst, - &n_pop_context, &error); + int result = ArrowDeviceCudaBufferCopyAsyncInternal(device_src, src, device_dst, dst, + &n_pop_context, &error, hstream); for (int i = 0; i < n_pop_context; i++) { CUcontext unused; NANOARROW_CUDA_ASSERT_OK(cuCtxPopCurrent(&unused)); @@ -318,17 +384,24 @@ static ArrowErrorCode ArrowDeviceCudaBufferCopy(struct ArrowDevice* device_src, return result; } -static ArrowErrorCode ArrowDeviceCudaBufferInit(struct ArrowDevice* device_src, - struct ArrowBufferView src, - struct ArrowDevice* device_dst, - struct ArrowBuffer* dst) { +static ArrowErrorCode ArrowDeviceCudaBufferInitAsync(struct ArrowDevice* device_src, + struct ArrowBufferView src, + struct ArrowDevice* device_dst, + struct ArrowBuffer* dst, + void* stream) { + if (stream == NULL) { + return EINVAL; + } + + CUstream hstream = *((CUstream*)stream); + struct ArrowBuffer tmp; switch (device_dst->device_type) { case ARROW_DEVICE_CUDA: case ARROW_DEVICE_CUDA_HOST: NANOARROW_RETURN_NOT_OK( - ArrowDeviceCudaAllocateBuffer(device_dst, &tmp, src.size_bytes)); + ArrowDeviceCudaAllocateBufferAsync(device_dst, &tmp, src.size_bytes, hstream)); break; case ARROW_DEVICE_CPU: ArrowBufferInit(&tmp); @@ -341,7 +414,8 @@ static ArrowErrorCode ArrowDeviceCudaBufferInit(struct ArrowDevice* device_src, struct ArrowBufferView tmp_view; tmp_view.data.data = tmp.data; tmp_view.size_bytes = tmp.size_bytes; - int result = ArrowDeviceCudaBufferCopy(device_src, src, device_dst, tmp_view); + int result = + ArrowDeviceCudaBufferCopyAsync(device_src, src, device_dst, tmp_view, &hstream); if (result != NANOARROW_OK) { ArrowBufferReset(&tmp); return result; @@ -352,7 +426,7 @@ static ArrowErrorCode ArrowDeviceCudaBufferInit(struct ArrowDevice* device_src, } static ArrowErrorCode ArrowDeviceCudaSynchronize(struct ArrowDevice* device, - void* sync_event, + void* sync_event, void* stream, struct ArrowError* error) { if (sync_event == NULL) { return NANOARROW_OK; @@ -363,11 +437,33 @@ static ArrowErrorCode ArrowDeviceCudaSynchronize(struct ArrowDevice* device, return ENOTSUP; } + // Sync functions require a context to be set + struct ArrowDeviceCudaPrivate* private_data = + (struct ArrowDeviceCudaPrivate*)device->private_data; + + NANOARROW_CUDA_RETURN_NOT_OK(cuCtxPushCurrent(private_data->cu_context), + "cuCtxPushCurrent", NULL); + CUcontext unused; // needed for cuCtxPopCurrent() + // Memory for cuda_event is owned by the ArrowArray member of the ArrowDeviceArray - CUevent* cuda_event = (CUevent*)sync_event; - NANOARROW_CUDA_RETURN_NOT_OK(cuEventSynchronize(*cuda_event), "cuEventSynchronize", - error); + CUevent* cu_event = (CUevent*)sync_event; + CUstream* cu_stream = (CUstream*)stream; + CUresult err; + const char* op = ""; + + if (cu_stream == NULL && cu_event != NULL) { + err = cuEventSynchronize(*cu_event); + op = "cuEventSynchronize"; + } else if (cu_stream != NULL && cu_event == NULL) { + err = cuStreamSynchronize(*cu_stream); + op = "cuStreamSynchronize"; + } else if (cu_stream != NULL && cu_event != NULL) { + err = cuStreamWaitEvent(*cu_stream, *cu_event, CU_EVENT_WAIT_DEFAULT); + op = "cuStreamWaitEvent"; + } + NANOARROW_ASSERT_OK(cuCtxPopCurrent(&unused)); + NANOARROW_CUDA_RETURN_NOT_OK(err, op, error); return NANOARROW_OK; } @@ -384,7 +480,7 @@ static ArrowErrorCode ArrowDeviceCudaArrayMove(struct ArrowDevice* device_src, // We do have to wait on the sync event, though, because this has to be NULL // for a CPU device array. NANOARROW_RETURN_NOT_OK( - ArrowDeviceCudaSynchronize(device_src, src->sync_event, NULL)); + ArrowDeviceCudaSynchronize(device_src, src->sync_event, NULL, NULL)); ArrowDeviceArrayMove(src, dst); dst->device_type = device_dst->device_type; dst->device_id = device_dst->device_id; @@ -436,11 +532,11 @@ static ArrowErrorCode ArrowDeviceCudaInitDevice(struct ArrowDevice* device, device->device_type = device_type; device->device_id = device_id; - device->array_init = &ArrowDeviceCudaArrayInit; + device->array_init = &ArrowDeviceCudaArrayInitAsync; device->array_move = &ArrowDeviceCudaArrayMove; - device->buffer_init = &ArrowDeviceCudaBufferInit; + device->buffer_init = &ArrowDeviceCudaBufferInitAsync; device->buffer_move = NULL; - device->buffer_copy = &ArrowDeviceCudaBufferCopy; + device->buffer_copy = &ArrowDeviceCudaBufferCopyAsync; device->synchronize_event = &ArrowDeviceCudaSynchronize; device->release = &ArrowDeviceCudaRelease; diff --git a/src/nanoarrow/device/cuda_test.cc b/src/nanoarrow/device/cuda_test.cc index ee89fa056..4b66d43bf 100644 --- a/src/nanoarrow/device/cuda_test.cc +++ b/src/nanoarrow/device/cuda_test.cc @@ -55,6 +55,66 @@ class CudaTemporaryContext { CUcontext context_; }; +class CudaStream { + public: + CudaStream(int64_t device_id) : device_id_(device_id), hstream_(0) {} + + ArrowErrorCode Init() { + CudaTemporaryContext ctx(device_id_); + if (!ctx.valid()) { + return EINVAL; + } + + if (cuStreamCreate(&hstream_, CU_STREAM_DEFAULT) != CUDA_SUCCESS) { + return EINVAL; + } + + return NANOARROW_OK; + } + + CUstream* get() { return &hstream_; } + + ~CudaStream() { + if (hstream_ != 0) { + cuStreamDestroy(hstream_); + } + } + + int64_t device_id_; + CUstream hstream_; +}; + +class CudaEvent { + public: + CudaEvent(int64_t device_id) : device_id_(device_id), hevent_(nullptr) {} + + ArrowErrorCode Init() { + CudaTemporaryContext ctx(device_id_); + if (!ctx.valid()) { + return EINVAL; + } + + if (cuEventCreate(&hevent_, CU_EVENT_DEFAULT) != CUDA_SUCCESS) { + return EINVAL; + } + + return NANOARROW_OK; + } + + CUevent* get() { return &hevent_; } + + void release() { hevent_ = nullptr; } + + ~CudaEvent() { + if (hevent_ != nullptr) { + cuEventDestroy(hevent_); + } + } + + int64_t device_id_; + CUevent hevent_; +}; + TEST(NanoarrowDeviceCuda, GetDevice) { struct ArrowDevice* cuda = ArrowDeviceCuda(ARROW_DEVICE_CUDA, 0); ASSERT_NE(cuda, nullptr); @@ -79,24 +139,35 @@ TEST(NanoarrowDeviceCuda, DeviceCudaBufferInit) { uint8_t data[] = {0x01, 0x02, 0x03, 0x04, 0x05}; struct ArrowBufferView cpu_view = {data, sizeof(data)}; + CudaStream stream(gpu->device_id); + ASSERT_EQ(stream.Init(), NANOARROW_OK); + + // Failing to provide a stream should error + ASSERT_EQ(ArrowDeviceBufferInitAsync(cpu, cpu_view, gpu, nullptr, nullptr), EINVAL); + // CPU -> GPU - ASSERT_EQ(ArrowDeviceBufferInit(cpu, cpu_view, gpu, &buffer_gpu), NANOARROW_OK); + ASSERT_EQ(ArrowDeviceBufferInitAsync(cpu, cpu_view, gpu, &buffer_gpu, stream.get()), + NANOARROW_OK); EXPECT_EQ(buffer_gpu.size_bytes, sizeof(data)); // (Content is tested on the roundtrip) struct ArrowBufferView gpu_view = {buffer_gpu.data, buffer_gpu.size_bytes}; // GPU -> GPU - ASSERT_EQ(ArrowDeviceBufferInit(gpu, gpu_view, gpu, &buffer), NANOARROW_OK); + ASSERT_EQ(ArrowDeviceBufferInitAsync(gpu, gpu_view, gpu, &buffer, stream.get()), + NANOARROW_OK); EXPECT_EQ(buffer.size_bytes, sizeof(data)); // (Content is tested on the roundtrip) ArrowBufferReset(&buffer); // GPU -> CPU - ASSERT_EQ(ArrowDeviceBufferInit(gpu, gpu_view, cpu, &buffer), NANOARROW_OK); + ASSERT_EQ(ArrowDeviceBufferInitAsync(gpu, gpu_view, cpu, &buffer, stream.get()), + NANOARROW_OK); EXPECT_EQ(buffer.size_bytes, sizeof(data)); + + ASSERT_EQ(cuStreamSynchronize(*stream.get()), CUDA_SUCCESS); EXPECT_EQ(memcmp(buffer.data, data, sizeof(data)), 0); - ArrowBufferReset(&buffer); + ArrowBufferReset(&buffer); ArrowBufferReset(&buffer_gpu); } @@ -110,25 +181,33 @@ TEST(NanoarrowDeviceCuda, DeviceCudaHostBufferInit) { uint8_t data[] = {0x01, 0x02, 0x03, 0x04, 0x05}; struct ArrowBufferView cpu_view = {data, sizeof(data)}; + CudaStream stream(gpu->device_id); + ASSERT_EQ(stream.Init(), NANOARROW_OK); + // CPU -> GPU - ASSERT_EQ(ArrowDeviceBufferInit(cpu, cpu_view, gpu, &buffer_gpu), NANOARROW_OK); + ASSERT_EQ(ArrowDeviceBufferInitAsync(cpu, cpu_view, gpu, &buffer_gpu, stream.get()), + NANOARROW_OK); EXPECT_EQ(buffer_gpu.size_bytes, sizeof(data)); EXPECT_EQ(memcmp(buffer_gpu.data, data, sizeof(data)), 0); // Here, "GPU" is memory in the CPU space allocated by cudaMallocHost struct ArrowBufferView gpu_view = {buffer_gpu.data, buffer_gpu.size_bytes}; // GPU -> GPU - ASSERT_EQ(ArrowDeviceBufferInit(gpu, gpu_view, gpu, &buffer), NANOARROW_OK); + ASSERT_EQ(ArrowDeviceBufferInitAsync(gpu, gpu_view, gpu, &buffer, stream.get()), + NANOARROW_OK); EXPECT_EQ(buffer.size_bytes, sizeof(data)); EXPECT_EQ(memcmp(buffer.data, data, sizeof(data)), 0); ArrowBufferReset(&buffer); // GPU -> CPU - ASSERT_EQ(ArrowDeviceBufferInit(gpu, gpu_view, cpu, &buffer), NANOARROW_OK); + ASSERT_EQ(ArrowDeviceBufferInitAsync(gpu, gpu_view, cpu, &buffer, stream.get()), + NANOARROW_OK); EXPECT_EQ(buffer.size_bytes, sizeof(data)); + + ASSERT_EQ(cuStreamSynchronize(*stream.get()), CUDA_SUCCESS); EXPECT_EQ(memcmp(buffer.data, data, sizeof(data)), 0); - ArrowBufferReset(&buffer); + ArrowBufferReset(&buffer); ArrowBufferReset(&buffer_gpu); } @@ -157,18 +236,28 @@ TEST(NanoarrowDeviceCuda, DeviceCudaBufferCopy) { GTEST_FAIL() << "cuMemAlloc() failed"; } + CudaStream stream(gpu->device_id); + ASSERT_EQ(stream.Init(), NANOARROW_OK); + + // Failing to provide a stream should error + ASSERT_EQ(ArrowDeviceBufferCopyAsync(cpu, cpu_view, gpu, gpu_view, nullptr), EINVAL); + // CPU -> GPU - ASSERT_EQ(ArrowDeviceBufferCopy(cpu, cpu_view, gpu, gpu_view), NANOARROW_OK); + ASSERT_EQ(ArrowDeviceBufferCopyAsync(cpu, cpu_view, gpu, gpu_view, stream.get()), + NANOARROW_OK); // GPU -> GPU - ASSERT_EQ(ArrowDeviceBufferCopy(gpu, gpu_view, gpu, gpu_view2), NANOARROW_OK); + ASSERT_EQ(ArrowDeviceBufferCopyAsync(gpu, gpu_view, gpu, gpu_view2, stream.get()), + NANOARROW_OK); // GPU -> CPU uint8_t cpu_dest[5]; struct ArrowBufferView cpu_dest_view = {cpu_dest, sizeof(data)}; - ASSERT_EQ(ArrowDeviceBufferCopy(gpu, gpu_view, cpu, cpu_dest_view), NANOARROW_OK); + ASSERT_EQ(ArrowDeviceBufferCopyAsync(gpu, gpu_view, cpu, cpu_dest_view, stream.get()), + NANOARROW_OK); // Check roundtrip + ASSERT_EQ(cuStreamSynchronize(*stream.get()), CUDA_SUCCESS); EXPECT_EQ(memcmp(cpu_dest, data, sizeof(data)), 0); // Clean up @@ -183,6 +272,56 @@ TEST(NanoarrowDeviceCuda, DeviceCudaBufferCopy) { } } +TEST(NanoarrowDeviceCuda, DeviceCudaArrayInit) { + struct ArrowDevice* gpu = ArrowDeviceCuda(ARROW_DEVICE_CUDA, 0); + + CudaStream stream(gpu->device_id); + ASSERT_EQ(stream.Init(), NANOARROW_OK); + + CudaEvent event(gpu->device_id); + ASSERT_EQ(event.Init(), NANOARROW_OK); + + struct ArrowDeviceArray device_array; + struct ArrowArray array; + array.release = nullptr; + + // No provided sync event should result in a null sync event in the final array + ASSERT_EQ(ArrowArrayInitFromType(&array, NANOARROW_TYPE_INT32), NANOARROW_OK); + ASSERT_EQ(ArrowDeviceArrayInit(gpu, &device_array, &array, nullptr), NANOARROW_OK); + ASSERT_EQ(device_array.sync_event, nullptr); + ArrowArrayRelease(&device_array.array); + + // Provided sync event should result in ownership of the event being taken by the + // device array. + device_array.sync_event = nullptr; + ASSERT_EQ(ArrowArrayInitFromType(&array, NANOARROW_TYPE_INT32), NANOARROW_OK); + ASSERT_EQ(ArrowDeviceArrayInit(gpu, &device_array, &array, event.get()), NANOARROW_OK); + ASSERT_EQ(*((CUevent*)device_array.sync_event), *event.get()); + event.release(); + ArrowArrayRelease(&device_array.array); + + // Provided stream without provided event should result in an event created by and owned + // by the device array + device_array.sync_event = nullptr; + ASSERT_EQ(ArrowArrayInitFromType(&array, NANOARROW_TYPE_INT32), NANOARROW_OK); + ASSERT_EQ(ArrowDeviceArrayInitAsync(gpu, &device_array, &array, nullptr, stream.get()), + NANOARROW_OK); + ASSERT_NE(*(CUevent*)device_array.sync_event, nullptr); + ArrowArrayRelease(&device_array.array); + + // Provided stream and sync event should result in the device array taking ownership + // and recording the event + ASSERT_EQ(event.Init(), NANOARROW_OK); + device_array.sync_event = nullptr; + ASSERT_EQ(ArrowArrayInitFromType(&array, NANOARROW_TYPE_INT32), NANOARROW_OK); + ASSERT_EQ( + ArrowDeviceArrayInitAsync(gpu, &device_array, &array, event.get(), stream.get()), + NANOARROW_OK); + ASSERT_EQ(*((CUevent*)device_array.sync_event), *event.get()); + event.release(); + ArrowArrayRelease(&device_array.array); +} + class StringTypeParameterizedTestFixture : public ::testing::TestWithParam> { protected: @@ -207,6 +346,10 @@ TEST_P(StringTypeParameterizedTestFixture, ArrowDeviceCudaArrayViewString) { bool include_null = std::get<2>(GetParam()); int64_t expected_data_size; // expected + CudaStream stream(gpu->device_id); + ASSERT_EQ(stream.Init(), NANOARROW_OK); + + // Create some test data ASSERT_EQ(ArrowArrayInitFromType(&array, string_type), NANOARROW_OK); ASSERT_EQ(ArrowArrayStartAppending(&array), NANOARROW_OK); ASSERT_EQ(ArrowArrayAppendString(&array, "abc"_asv), NANOARROW_OK); @@ -230,19 +373,25 @@ TEST_P(StringTypeParameterizedTestFixture, ArrowDeviceCudaArrayViewString) { EXPECT_EQ(device_array_view.array_view.buffer_views[2].size_bytes, expected_data_size); EXPECT_EQ(device_array.array.length, 3); + // Failing to provide a stream should error + ASSERT_EQ(ArrowDeviceArrayViewCopyAsync(&device_array_view, gpu, nullptr, nullptr), + EINVAL); + // Copy required to Cuda struct ArrowDeviceArray device_array2; device_array2.array.release = nullptr; ASSERT_EQ(ArrowDeviceArrayMoveToDevice(&device_array, gpu, &device_array2), ENOTSUP); - ASSERT_EQ(ArrowDeviceArrayViewCopy(&device_array_view, gpu, &device_array2), + ASSERT_EQ(ArrowDeviceArrayViewCopyAsync(&device_array_view, gpu, &device_array2, + stream.get()), NANOARROW_OK); ArrowArrayRelease(&device_array.array); ASSERT_NE(device_array2.array.release, nullptr); ASSERT_EQ(device_array2.device_id, gpu->device_id); - ASSERT_EQ(ArrowDeviceArrayViewSetArray(&device_array_view, &device_array2, nullptr), - NANOARROW_OK); - EXPECT_EQ(device_array_view.array_view.buffer_views[2].size_bytes, expected_data_size); + ASSERT_EQ( + ArrowDeviceArrayViewSetArrayMinimal(&device_array_view, &device_array2, nullptr), + NANOARROW_OK); + EXPECT_EQ(device_array_view.array_view.buffer_views[2].size_bytes, -1); EXPECT_EQ(device_array_view.array_view.length, 3); EXPECT_EQ(device_array2.array.length, 3); @@ -251,7 +400,8 @@ TEST_P(StringTypeParameterizedTestFixture, ArrowDeviceCudaArrayViewString) { ASSERT_EQ(ArrowDeviceArrayMoveToDevice(&device_array2, cpu, &device_array), NANOARROW_OK); } else { - ASSERT_EQ(ArrowDeviceArrayViewCopy(&device_array_view, cpu, &device_array), + ASSERT_EQ(ArrowDeviceArrayViewCopyAsync(&device_array_view, cpu, &device_array, + stream.get()), NANOARROW_OK); ArrowArrayRelease(&device_array2.array); } @@ -261,8 +411,9 @@ TEST_P(StringTypeParameterizedTestFixture, ArrowDeviceCudaArrayViewString) { ASSERT_EQ(ArrowDeviceArrayViewSetArray(&device_array_view, &device_array, nullptr), NANOARROW_OK); - EXPECT_EQ(device_array_view.array_view.buffer_views[2].size_bytes, expected_data_size); + ASSERT_EQ(device_array_view.array_view.buffer_views[2].size_bytes, expected_data_size); + ASSERT_EQ(cuStreamSynchronize(*stream.get()), CUDA_SUCCESS); if (include_null) { EXPECT_EQ( memcmp(device_array_view.array_view.buffer_views[2].data.data, "abcdefg", 7), 0); @@ -294,6 +445,4 @@ INSTANTIATE_TEST_SUITE_P( TestParams(ARROW_DEVICE_CUDA_HOST, NANOARROW_TYPE_BINARY, true), TestParams(ARROW_DEVICE_CUDA_HOST, NANOARROW_TYPE_BINARY, false), TestParams(ARROW_DEVICE_CUDA_HOST, NANOARROW_TYPE_LARGE_BINARY, true), - TestParams(ARROW_DEVICE_CUDA_HOST, NANOARROW_TYPE_LARGE_BINARY, false) - - )); + TestParams(ARROW_DEVICE_CUDA_HOST, NANOARROW_TYPE_LARGE_BINARY, false))); diff --git a/src/nanoarrow/device/device.c b/src/nanoarrow/device/device.c index 14b57b5dc..352db2537 100644 --- a/src/nanoarrow/device/device.c +++ b/src/nanoarrow/device/device.c @@ -43,15 +43,20 @@ static void ArrowDeviceArrayInitDefault(struct ArrowDevice* device, ArrowArrayMove(array, &device_array->array); } -static ArrowErrorCode ArrowDeviceCpuBufferInit(struct ArrowDevice* device_src, - struct ArrowBufferView src, - struct ArrowDevice* device_dst, - struct ArrowBuffer* dst) { +static ArrowErrorCode ArrowDeviceCpuBufferInitAsync(struct ArrowDevice* device_src, + struct ArrowBufferView src, + struct ArrowDevice* device_dst, + struct ArrowBuffer* dst, + void* stream) { if (device_dst->device_type != ARROW_DEVICE_CPU || device_src->device_type != ARROW_DEVICE_CPU) { return ENOTSUP; } + if (stream != NULL) { + return EINVAL; + } + ArrowBufferInit(dst); dst->allocator = ArrowBufferAllocatorDefault(); NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(dst, src.data.as_uint8, src.size_bytes)); @@ -74,30 +79,35 @@ static ArrowErrorCode ArrowDeviceCpuBufferMove(struct ArrowDevice* device_src, static ArrowErrorCode ArrowDeviceCpuBufferCopy(struct ArrowDevice* device_src, struct ArrowBufferView src, struct ArrowDevice* device_dst, - struct ArrowBufferView dst) { + struct ArrowBufferView dst, void* stream) { if (device_dst->device_type != ARROW_DEVICE_CPU || device_src->device_type != ARROW_DEVICE_CPU) { return ENOTSUP; } + if (stream != NULL) { + return EINVAL; + } + memcpy((uint8_t*)dst.data.as_uint8, src.data.as_uint8, dst.size_bytes); return NANOARROW_OK; } static ArrowErrorCode ArrowDeviceCpuSynchronize(struct ArrowDevice* device, - void* sync_event, + void* sync_event, void* stream, struct ArrowError* error) { switch (device->device_type) { case ARROW_DEVICE_CPU: - if (sync_event != NULL) { - ArrowErrorSet(error, "Expected NULL sync_event for ARROW_DEVICE_CPU but got %p", - sync_event); + if (sync_event != NULL || stream != NULL) { + ArrowErrorSet(error, "sync_event and stream must be NULL for ARROW_DEVICE_CPU"); return EINVAL; } else { return NANOARROW_OK; } default: - return device->synchronize_event(device, sync_event, error); + ArrowErrorSet(error, "Expected CPU device but got device type %d", + (int)device->device_id); + return ENOTSUP; } } @@ -118,7 +128,7 @@ void ArrowDeviceInitCpu(struct ArrowDevice* device) { device->device_id = -1; device->array_init = NULL; device->array_move = NULL; - device->buffer_init = &ArrowDeviceCpuBufferInit; + device->buffer_init = &ArrowDeviceCpuBufferInitAsync; device->buffer_move = &ArrowDeviceCpuBufferMove; device->buffer_copy = &ArrowDeviceCpuBufferCopy; device->synchronize_event = &ArrowDeviceCpuSynchronize; @@ -145,15 +155,16 @@ struct ArrowDevice* ArrowDeviceResolve(ArrowDeviceType device_type, int64_t devi return NULL; } -ArrowErrorCode ArrowDeviceArrayInit(struct ArrowDevice* device, - struct ArrowDeviceArray* device_array, - struct ArrowArray* array, void* sync_event) { +ArrowErrorCode ArrowDeviceArrayInitAsync(struct ArrowDevice* device, + struct ArrowDeviceArray* device_array, + struct ArrowArray* array, void* sync_event, + void* stream) { if (device->array_init != NULL) { - return device->array_init(device, device_array, array, sync_event); + return device->array_init(device, device_array, array, sync_event, stream); } - // Handling a sync event is not supported in the default constructor - if (sync_event != NULL) { + // Sync event and stream aren't handled by the fallback implementation + if (sync_event != NULL || stream != NULL) { return EINVAL; } @@ -161,13 +172,13 @@ ArrowErrorCode ArrowDeviceArrayInit(struct ArrowDevice* device, return NANOARROW_OK; } -ArrowErrorCode ArrowDeviceBufferInit(struct ArrowDevice* device_src, - struct ArrowBufferView src, - struct ArrowDevice* device_dst, - struct ArrowBuffer* dst) { - int result = device_dst->buffer_init(device_src, src, device_dst, dst); +ArrowErrorCode ArrowDeviceBufferInitAsync(struct ArrowDevice* device_src, + struct ArrowBufferView src, + struct ArrowDevice* device_dst, + struct ArrowBuffer* dst, void* stream) { + int result = device_dst->buffer_init(device_src, src, device_dst, dst, stream); if (result == ENOTSUP) { - result = device_src->buffer_init(device_src, src, device_dst, dst); + result = device_src->buffer_init(device_src, src, device_dst, dst, stream); } return result; @@ -185,13 +196,13 @@ ArrowErrorCode ArrowDeviceBufferMove(struct ArrowDevice* device_src, return result; } -ArrowErrorCode ArrowDeviceBufferCopy(struct ArrowDevice* device_src, - struct ArrowBufferView src, - struct ArrowDevice* device_dst, - struct ArrowBufferView dst) { - int result = device_dst->buffer_copy(device_src, src, device_dst, dst); +ArrowErrorCode ArrowDeviceBufferCopyAsync(struct ArrowDevice* device_src, + struct ArrowBufferView src, + struct ArrowDevice* device_dst, + struct ArrowBufferView dst, void* stream) { + int result = device_dst->buffer_copy(device_src, src, device_dst, dst, stream); if (result == ENOTSUP) { - result = device_src->buffer_copy(device_src, src, device_dst, dst); + result = device_src->buffer_copy(device_src, src, device_dst, dst, stream); } return result; @@ -273,64 +284,106 @@ void ArrowDeviceArrayViewReset(struct ArrowDeviceArrayView* device_array_view) { device_array_view->device = NULL; } -static ArrowErrorCode ArrowDeviceBufferGetInt32(struct ArrowDevice* device, - struct ArrowBufferView buffer_view, - int64_t i, int32_t* out) { - struct ArrowBufferView out_view; - out_view.data.as_int32 = out; - out_view.size_bytes = sizeof(int32_t); +ArrowErrorCode ArrowDeviceArrayViewSetArrayMinimal( + struct ArrowDeviceArrayView* device_array_view, struct ArrowDeviceArray* device_array, + struct ArrowError* error) { + // Resolve device + struct ArrowDevice* device = + ArrowDeviceResolve(device_array->device_type, device_array->device_id); + if (device == NULL) { + ArrowErrorSet(error, + "Can't resolve device with type %" PRId32 " and identifier %" PRId64, + device_array->device_type, device_array->device_id); + return EINVAL; + } + + // Set the device array device + device_array_view->device = device; + + // Populate the array_view + NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArrayMinimal(&device_array_view->array_view, + &device_array->array, error)); + + // Populate the sync_event + device_array_view->sync_event = device_array->sync_event; - struct ArrowBufferView device_buffer_view; - device_buffer_view.data.as_int32 = buffer_view.data.as_int32 + i; - device_buffer_view.size_bytes = sizeof(int32_t); - NANOARROW_RETURN_NOT_OK( - ArrowDeviceBufferCopy(device, device_buffer_view, ArrowDeviceCpu(), out_view)); return NANOARROW_OK; } -static ArrowErrorCode ArrowDeviceBufferGetInt64(struct ArrowDevice* device, - struct ArrowBufferView buffer_view, - int64_t i, int64_t* out) { - struct ArrowBufferView out_view; - out_view.data.as_int64 = out; - out_view.size_bytes = sizeof(int64_t); +// Walks the tree of arrays to count the number of buffers with unknown size +// and the number of bytes we need to copy from a device buffer to find it. +static ArrowErrorCode ArrowDeviceArrayViewWalkUnknownBufferSizes( + struct ArrowArrayView* array_view, int64_t* offset_buffer_size) { + switch (array_view->storage_type) { + case NANOARROW_TYPE_STRING: + case NANOARROW_TYPE_BINARY: + case NANOARROW_TYPE_LARGE_STRING: + case NANOARROW_TYPE_LARGE_BINARY: + if (array_view->length == 0 || array_view->buffer_views[1].size_bytes == 0) { + array_view->buffer_views[2].size_bytes = 0; + } else if (array_view->buffer_views[2].size_bytes == -1) { + *offset_buffer_size += array_view->layout.element_size_bits[1] / 8; + } + break; + default: + break; + } + + // Recurse for children + for (int64_t i = 0; i < array_view->n_children; i++) { + NANOARROW_RETURN_NOT_OK(ArrowDeviceArrayViewWalkUnknownBufferSizes( + array_view->children[i], offset_buffer_size)); + } + + // ...and for dictionary + if (array_view->dictionary != NULL) { + NANOARROW_RETURN_NOT_OK(ArrowDeviceArrayViewWalkUnknownBufferSizes( + array_view->dictionary, offset_buffer_size)); + } - struct ArrowBufferView device_buffer_view; - device_buffer_view.data.as_int64 = buffer_view.data.as_int64 + i; - device_buffer_view.size_bytes = sizeof(int64_t); - NANOARROW_RETURN_NOT_OK( - ArrowDeviceBufferCopy(device, device_buffer_view, ArrowDeviceCpu(), out_view)); return NANOARROW_OK; } -static ArrowErrorCode ArrowDeviceArrayViewResolveBufferSizes( - struct ArrowDevice* device, struct ArrowArrayView* array_view) { - // Calculate buffer sizes that require accessing the offset buffer - // (at this point all other sizes have been resolved). +// Walks the tree of arrays and launches an async copy of the relevant +// item in the array's offset buffer to the temporary buffer we've just +// allocated to collect these values. +static ArrowErrorCode ArrowDeviceArrayViewResolveUnknownBufferSizesAsync( + struct ArrowDevice* device, struct ArrowArrayView* array_view, + uint8_t** offset_value_dst, void* stream) { int64_t offset_plus_length = array_view->offset + array_view->length; - int32_t last_offset32; - int64_t last_offset64; + + struct ArrowBufferView src_view; + struct ArrowBufferView dst_view; switch (array_view->storage_type) { case NANOARROW_TYPE_STRING: case NANOARROW_TYPE_BINARY: - if (array_view->buffer_views[1].size_bytes == 0) { - array_view->buffer_views[2].size_bytes = 0; - } else if (array_view->buffer_views[2].size_bytes == -1) { - NANOARROW_RETURN_NOT_OK(ArrowDeviceBufferGetInt32( - device, array_view->buffer_views[1], offset_plus_length, &last_offset32)); - array_view->buffer_views[2].size_bytes = last_offset32; + if (array_view->buffer_views[2].size_bytes == -1) { + src_view.data.as_int32 = + array_view->buffer_views[1].data.as_int32 + offset_plus_length; + src_view.size_bytes = sizeof(int32_t); + dst_view.data.as_uint8 = *offset_value_dst; + dst_view.size_bytes = sizeof(int32_t); + + NANOARROW_RETURN_NOT_OK(ArrowDeviceBufferCopyAsync( + device, src_view, ArrowDeviceCpu(), dst_view, stream)); + + (*offset_value_dst) += sizeof(int32_t); } break; - case NANOARROW_TYPE_LARGE_STRING: case NANOARROW_TYPE_LARGE_BINARY: - if (array_view->buffer_views[1].size_bytes == 0) { - array_view->buffer_views[2].size_bytes = 0; - } else if (array_view->buffer_views[2].size_bytes == -1) { - NANOARROW_RETURN_NOT_OK(ArrowDeviceBufferGetInt64( - device, array_view->buffer_views[1], offset_plus_length, &last_offset64)); - array_view->buffer_views[2].size_bytes = last_offset64; + if (array_view->buffer_views[2].size_bytes == -1) { + src_view.data.as_int64 = + array_view->buffer_views[1].data.as_int64 + offset_plus_length; + src_view.size_bytes = sizeof(int64_t); + dst_view.data.as_uint8 = *offset_value_dst; + dst_view.size_bytes = sizeof(int64_t); + + NANOARROW_RETURN_NOT_OK(ArrowDeviceBufferCopyAsync( + device, src_view, ArrowDeviceCpu(), dst_view, stream)); + + (*offset_value_dst) += sizeof(int64_t); } break; default: @@ -339,55 +392,117 @@ static ArrowErrorCode ArrowDeviceArrayViewResolveBufferSizes( // Recurse for children for (int64_t i = 0; i < array_view->n_children; i++) { - NANOARROW_RETURN_NOT_OK( - ArrowDeviceArrayViewResolveBufferSizes(device, array_view->children[i])); + NANOARROW_RETURN_NOT_OK(ArrowDeviceArrayViewResolveUnknownBufferSizesAsync( + device, array_view->children[i], offset_value_dst, stream)); + } + + // ...and for dictionary + if (array_view->dictionary != NULL) { + NANOARROW_RETURN_NOT_OK(ArrowDeviceArrayViewResolveUnknownBufferSizesAsync( + device, array_view->dictionary, offset_value_dst, stream)); } return NANOARROW_OK; } -ArrowErrorCode ArrowDeviceArrayViewSetArrayMinimal( - struct ArrowDeviceArrayView* device_array_view, struct ArrowDeviceArray* device_array, +// After synchronizing the stream with the CPU to ensure that all of the +// buffer sizes have been copied to the our temporary buffer, relay them +// back to the appropriate buffer view so that the buffer copier can +// do its thing. +static void ArrowDeviceArrayViewCollectUnknownBufferSizes( + struct ArrowArrayView* array_view, uint8_t** offset_value_dst) { + switch (array_view->storage_type) { + case NANOARROW_TYPE_STRING: + case NANOARROW_TYPE_BINARY: + if (array_view->buffer_views[2].size_bytes == -1) { + int32_t size_bytes_32; + memcpy(&size_bytes_32, *offset_value_dst, sizeof(int32_t)); + array_view->buffer_views[2].size_bytes = size_bytes_32; + (*offset_value_dst) += sizeof(int32_t); + } + break; + case NANOARROW_TYPE_LARGE_STRING: + case NANOARROW_TYPE_LARGE_BINARY: + if (array_view->buffer_views[2].size_bytes == -1) { + memcpy(&array_view->buffer_views[2].size_bytes, *offset_value_dst, + sizeof(int64_t)); + (*offset_value_dst) += sizeof(int64_t); + } + break; + default: + break; + } + + // Recurse for children + for (int64_t i = 0; i < array_view->n_children; i++) { + ArrowDeviceArrayViewCollectUnknownBufferSizes(array_view->children[i], + offset_value_dst); + } + + // ...and for dictionary + if (array_view->dictionary != NULL) { + ArrowDeviceArrayViewCollectUnknownBufferSizes(array_view->dictionary, + offset_value_dst); + } +} + +static ArrowErrorCode ArrowDeviceArrayViewEnsureBufferSizesAsync( + struct ArrowDeviceArrayView* device_array_view, void* stream, struct ArrowError* error) { - // Resolve device - struct ArrowDevice* device = - ArrowDeviceResolve(device_array->device_type, device_array->device_id); - if (device == NULL) { - ArrowErrorSet(error, - "Can't resolve device with type %" PRId32 " and identifier %" PRId64, - device_array->device_type, device_array->device_id); - return EINVAL; + // Walk the tree of arrays to check for buffers whose size we don't know + int64_t temp_buffer_length_bytes_required = 0; + NANOARROW_RETURN_NOT_OK(ArrowDeviceArrayViewWalkUnknownBufferSizes( + &device_array_view->array_view, &temp_buffer_length_bytes_required)); + + // If there are no such arrays (e.g., there are no string or binary arrays in the tree), + // we don't have to do anything extra + if (temp_buffer_length_bytes_required == 0) { + return NANOARROW_OK; } - // Set the device array device - device_array_view->device = device; + // Ensure that the stream provided waits on the array's sync event + NANOARROW_RETURN_NOT_OK(device_array_view->device->synchronize_event( + device_array_view->device, device_array_view->sync_event, stream, error)); - // Populate the array_view - NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArrayMinimal(&device_array_view->array_view, - &device_array->array, error)); + // Allocate a buffer big enough to hold all the offset values we need to + // copy from the GPU + struct ArrowBuffer buffer; + ArrowBufferInit(&buffer); + NANOARROW_RETURN_NOT_OK( + ArrowBufferResize(&buffer, temp_buffer_length_bytes_required, 0)); + + uint8_t* cursor = buffer.data; + int result = ArrowDeviceArrayViewResolveUnknownBufferSizesAsync( + device_array_view->device, &device_array_view->array_view, &cursor, stream); + if (result != NANOARROW_OK) { + ArrowBufferReset(&buffer); + return result; + } + + NANOARROW_DCHECK(cursor == (buffer.data + buffer.size_bytes)); + + // Synchronize the stream with the CPU + result = device_array_view->device->synchronize_event(device_array_view->device, NULL, + stream, error); + + // Collect the values from the temporary buffer + cursor = buffer.data; + ArrowDeviceArrayViewCollectUnknownBufferSizes(&device_array_view->array_view, &cursor); + NANOARROW_DCHECK(cursor == (buffer.data + buffer.size_bytes)); + ArrowBufferReset(&buffer); return NANOARROW_OK; } -ArrowErrorCode ArrowDeviceArrayViewSetArray( +ArrowErrorCode ArrowDeviceArrayViewSetArrayAsync( struct ArrowDeviceArrayView* device_array_view, struct ArrowDeviceArray* device_array, - struct ArrowError* error) { + void* stream, struct ArrowError* error) { + // Populate the array view with all information accessible from the CPU NANOARROW_RETURN_NOT_OK( ArrowDeviceArrayViewSetArrayMinimal(device_array_view, device_array, error)); - // Wait on device_array to synchronize with the CPU - // TODO: This is not actually sufficient for CUDA, where the synchronization - // should happen after the cudaMemcpy, not before it. The ordering of - // these operations should be explicit and asynchronous (and is probably outside - // the scope of what can be done with a generic callback). - NANOARROW_RETURN_NOT_OK(device_array_view->device->synchronize_event( - device_array_view->device, device_array->sync_event, error)); - - // Resolve unknown buffer sizes (i.e., string, binary, large string, large binary) - NANOARROW_RETURN_NOT_OK_WITH_ERROR( - ArrowDeviceArrayViewResolveBufferSizes(device_array_view->device, - &device_array_view->array_view), - error); + NANOARROW_RETURN_NOT_OK( + ArrowDeviceArrayViewEnsureBufferSizesAsync(device_array_view, stream, error)); return NANOARROW_OK; } @@ -395,7 +510,8 @@ ArrowErrorCode ArrowDeviceArrayViewSetArray( static ArrowErrorCode ArrowDeviceArrayViewCopyInternal(struct ArrowDevice* device_src, struct ArrowArrayView* src, struct ArrowDevice* device_dst, - struct ArrowArray* dst) { + struct ArrowArray* dst, + void* stream) { // Currently no attempt to minimize the amount of memory copied (i.e., // by applying offset + length and copying potentially fewer bytes) dst->length = src->length; @@ -407,43 +523,58 @@ static ArrowErrorCode ArrowDeviceArrayViewCopyInternal(struct ArrowDevice* devic break; } - NANOARROW_RETURN_NOT_OK(ArrowDeviceBufferInit(device_src, src->buffer_views[i], - device_dst, ArrowArrayBuffer(dst, i))); + NANOARROW_RETURN_NOT_OK(ArrowDeviceBufferInitAsync( + device_src, src->buffer_views[i], device_dst, ArrowArrayBuffer(dst, i), stream)); } for (int64_t i = 0; i < src->n_children; i++) { NANOARROW_RETURN_NOT_OK(ArrowDeviceArrayViewCopyInternal( - device_src, src->children[i], device_dst, dst->children[i])); + device_src, src->children[i], device_dst, dst->children[i], stream)); } if (src->dictionary != NULL) { NANOARROW_RETURN_NOT_OK(ArrowDeviceArrayViewCopyInternal( - device_src, src->dictionary, device_dst, dst->dictionary)); + device_src, src->dictionary, device_dst, dst->dictionary, stream)); } return NANOARROW_OK; } -ArrowErrorCode ArrowDeviceArrayViewCopy(struct ArrowDeviceArrayView* src, - struct ArrowDevice* device_dst, - struct ArrowDeviceArray* dst) { +ArrowErrorCode ArrowDeviceArrayViewCopyAsync(struct ArrowDeviceArrayView* src, + struct ArrowDevice* device_dst, + struct ArrowDeviceArray* dst, void* stream) { + // Ensure src has all buffer sizes defined + NANOARROW_RETURN_NOT_OK(ArrowDeviceArrayViewEnsureBufferSizesAsync(src, stream, NULL)); + struct ArrowArray tmp; NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromArrayView(&tmp, &src->array_view, NULL)); - int result = - ArrowDeviceArrayViewCopyInternal(src->device, &src->array_view, device_dst, &tmp); + int result = ArrowDeviceArrayViewCopyInternal(src->device, &src->array_view, device_dst, + &tmp, stream); if (result != NANOARROW_OK) { ArrowArrayRelease(&tmp); return result; } + // If we are copying to the CPU, we need to synchronize the stream because we + // can't populate a sync event for a CPU array. + if (device_dst->device_type == ARROW_DEVICE_CPU) { + result = src->device->synchronize_event(src->device, NULL, stream, NULL); + if (result != NANOARROW_OK) { + ArrowArrayRelease(&tmp); + return result; + } + + stream = NULL; + } + result = ArrowArrayFinishBuilding(&tmp, NANOARROW_VALIDATION_LEVEL_MINIMAL, NULL); if (result != NANOARROW_OK) { ArrowArrayRelease(&tmp); return result; } - result = ArrowDeviceArrayInit(device_dst, dst, &tmp, NULL); + result = ArrowDeviceArrayInitAsync(device_dst, dst, &tmp, NULL, stream); if (result != NANOARROW_OK) { ArrowArrayRelease(&tmp); return result; diff --git a/src/nanoarrow/device/device_test.cc b/src/nanoarrow/device/device_test.cc index 6765baed8..a303bf340 100644 --- a/src/nanoarrow/device/device_test.cc +++ b/src/nanoarrow/device/device_test.cc @@ -32,9 +32,13 @@ TEST(NanoarrowDevice, CpuDevice) { EXPECT_EQ(cpu, ArrowDeviceCpu()); void* sync_event = nullptr; - EXPECT_EQ(cpu->synchronize_event(cpu, sync_event, nullptr), NANOARROW_OK); + void* stream = nullptr; + EXPECT_EQ(cpu->synchronize_event(cpu, sync_event, stream, nullptr), NANOARROW_OK); sync_event = cpu; - EXPECT_EQ(cpu->synchronize_event(cpu, sync_event, nullptr), EINVAL); + EXPECT_EQ(cpu->synchronize_event(cpu, sync_event, stream, nullptr), EINVAL); + sync_event = nullptr; + stream = cpu; + EXPECT_EQ(cpu->synchronize_event(cpu, sync_event, stream, nullptr), EINVAL); } TEST(NanoarrowDevice, ArrowDeviceCpuBuffer) { diff --git a/src/nanoarrow/device/metal.cc b/src/nanoarrow/device/metal.cc index 17083a9e0..78530ac17 100644 --- a/src/nanoarrow/device/metal.cc +++ b/src/nanoarrow/device/metal.cc @@ -150,20 +150,23 @@ static void ArrowDeviceMetalArrayRelease(struct ArrowArray* array) { } ArrowArrayRelease(&private_data->parent); ArrowFree(private_data); - array->release = NULL; + array->release = nullptr; } -static ArrowErrorCode ArrowDeviceMetalArrayInit(struct ArrowDevice* device, - struct ArrowDeviceArray* device_array, - struct ArrowArray* array, - void* sync_event) { +static ArrowErrorCode ArrowDeviceMetalArrayInitAsync( + struct ArrowDevice* device, struct ArrowDeviceArray* device_array, + struct ArrowArray* array, void* sync_event, void* stream) { struct ArrowDeviceMetalArrayPrivate* private_data = (struct ArrowDeviceMetalArrayPrivate*)ArrowMalloc( sizeof(struct ArrowDeviceMetalArrayPrivate)); - if (private_data == NULL) { + if (private_data == nullptr) { return ENOMEM; } + if (stream != NULL) { + return EINVAL; + } + // One can create a new event with mtl_device->newSharedEvent(); private_data->event = static_cast(sync_event); @@ -180,10 +183,11 @@ static ArrowErrorCode ArrowDeviceMetalArrayInit(struct ArrowDevice* device, return NANOARROW_OK; } -static ArrowErrorCode ArrowDeviceMetalBufferInit(struct ArrowDevice* device_src, - struct ArrowBufferView src, - struct ArrowDevice* device_dst, - struct ArrowBuffer* dst) { +static ArrowErrorCode ArrowDeviceMetalBufferInitAsync(struct ArrowDevice* device_src, + struct ArrowBufferView src, + struct ArrowDevice* device_dst, + struct ArrowBuffer* dst, + void* stream) { if (device_src->device_type == ARROW_DEVICE_CPU && device_dst->device_type == ARROW_DEVICE_METAL) { struct ArrowBuffer tmp; @@ -246,10 +250,11 @@ static ArrowErrorCode ArrowDeviceMetalBufferMove(struct ArrowDevice* device_src, } } -static ArrowErrorCode ArrowDeviceMetalBufferCopy(struct ArrowDevice* device_src, - struct ArrowBufferView src, - struct ArrowDevice* device_dst, - struct ArrowBufferView dst) { +static ArrowErrorCode ArrowDeviceMetalBufferCopyAsync(struct ArrowDevice* device_src, + struct ArrowBufferView src, + struct ArrowDevice* device_dst, + struct ArrowBufferView dst, + void* stream) { // This is all just memcpy since it's all living in the same address space if (device_src->device_type == ARROW_DEVICE_CPU && device_dst->device_type == ARROW_DEVICE_METAL) { @@ -292,7 +297,7 @@ static int ArrowDeviceMetalCopyRequiredCpuToMetal(MTL::Device* mtl_device, } static ArrowErrorCode ArrowDeviceMetalSynchronize(struct ArrowDevice* device, - void* sync_event, + void* sync_event, void* stream, struct ArrowError* error) { // TODO: sync events for Metal are harder than for CUDA // https://developer.apple.com/documentation/metal/resource_synchronization/synchronizing_events_between_a_gpu_and_the_cpu?language=objc @@ -310,6 +315,11 @@ static ArrowErrorCode ArrowDeviceMetalSynchronize(struct ArrowDevice* device, // listener->release(); + // The case where we actually have to do something is not implemented + if (sync_event != NULL || stream != NULL) { + return ENOTSUP; + } + return NANOARROW_OK; } @@ -334,7 +344,7 @@ static ArrowErrorCode ArrowDeviceMetalArrayMove(struct ArrowDevice* device_src, } else if (device_src->device_type == ARROW_DEVICE_METAL && device_dst->device_type == ARROW_DEVICE_CPU) { NANOARROW_RETURN_NOT_OK( - ArrowDeviceMetalSynchronize(device_src, src->sync_event, nullptr)); + ArrowDeviceMetalSynchronize(device_src, src->sync_event, nullptr, nullptr)); ArrowDeviceArrayMove(src, dst); dst->device_type = device_dst->device_type; dst->device_id = device_dst->device_id; @@ -376,11 +386,11 @@ ArrowErrorCode ArrowDeviceMetalInitDefaultDevice(struct ArrowDevice* device, device->device_type = ARROW_DEVICE_METAL; device->device_id = static_cast(default_device->registryID()); - device->array_init = &ArrowDeviceMetalArrayInit; + device->array_init = &ArrowDeviceMetalArrayInitAsync; device->array_move = &ArrowDeviceMetalArrayMove; - device->buffer_init = &ArrowDeviceMetalBufferInit; + device->buffer_init = &ArrowDeviceMetalBufferInitAsync; device->buffer_move = &ArrowDeviceMetalBufferMove; - device->buffer_copy = &ArrowDeviceMetalBufferCopy; + device->buffer_copy = &ArrowDeviceMetalBufferCopyAsync; device->synchronize_event = &ArrowDeviceMetalSynchronize; device->release = &ArrowDeviceMetalRelease; device->private_data = default_device; diff --git a/src/nanoarrow/nanoarrow_device.h b/src/nanoarrow/nanoarrow_device.h index b284a9aa3..ef5d3cca1 100644 --- a/src/nanoarrow/nanoarrow_device.h +++ b/src/nanoarrow/nanoarrow_device.h @@ -121,27 +121,28 @@ static inline void ArrowDeviceArrayMove(struct ArrowDeviceArray* src, #define ArrowDeviceCheckRuntime \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceCheckRuntime) -#define ArrowDeviceArrayInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceArrayInit) +#define ArrowDeviceArrayInitAsync \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceArrayInitAsync) #define ArrowDeviceArrayViewInit \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceArrayViewInit) #define ArrowDeviceArrayViewReset \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceArrayViewReset) #define ArrowDeviceArrayViewSetArrayMinimal \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceArrayViewSetArrayMinimal) -#define ArrowDeviceArrayViewSetArray \ - NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceArrayViewSetArray) -#define ArrowDeviceArrayViewCopy \ - NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceArrayViewCopy) -#define ArrowDeviceArrayViewCopyRequired \ - NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceArrayViewCopyRequired) +#define ArrowDeviceArrayViewSetArrayAsync \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceArrayViewSetArrayAsync) +#define ArrowDeviceArrayViewCopyAsync \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceArrayViewCopyAsync) #define ArrowDeviceArrayMoveToDevice \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceArrayMoveToDevice) #define ArrowDeviceResolve NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceResolve) #define ArrowDeviceCpu NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceCpu) #define ArrowDeviceInitCpu NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceInitCpu) -#define ArrowDeviceBufferInit NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceBufferInit) +#define ArrowDeviceBufferInitAsync \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceBufferInitAsync) #define ArrowDeviceBufferMove NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceBufferMove) -#define ArrowDeviceBufferCopy NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceBufferCopy) +#define ArrowDeviceBufferCopyAsync \ + NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceBufferCopyAsync) #define ArrowDeviceBasicArrayStreamInit \ NANOARROW_SYMBOL(NANOARROW_NAMESPACE, ArrowDeviceBasicArrayStreamInit) @@ -168,6 +169,12 @@ static inline void ArrowDeviceArrayMove(struct ArrowDeviceArray* src, /// \brief Checks the nanoarrow runtime to make sure the run/build versions match ArrowErrorCode ArrowDeviceCheckRuntime(struct ArrowError* error); +struct ArrowDeviceArrayView { + struct ArrowDevice* device; + struct ArrowArrayView array_view; + void* sync_event; +}; + /// \brief A Device wrapper with callbacks for basic memory management tasks /// /// All device objects are currently implemented as singletons; however, this @@ -182,19 +189,22 @@ struct ArrowDevice { /// \brief Initialize an ArrowDeviceArray from a previously allocated ArrowArray /// /// Given a device and an uninitialized device_array, populate the fields of the - /// device_array (including sync_event) appropriately. If NANOARROW_OK is returned, - /// ownership of array is transferred to device_array. This function must allocate - /// the appropriate sync_event and make its address available as - /// device_array->sync_event (if sync_event applies to this device type). + /// device_array appropriately. If sync_event is non-null, ownership is transferred + /// to the output array. If stream is non-null, the event must be recorded such that + /// it captures the work done on stream. If NANOARROW_OK is returned, ownership of array + /// and sync_event is transferred to device_array. The caller retains ownership of + /// stream. ArrowErrorCode (*array_init)(struct ArrowDevice* device, struct ArrowDeviceArray* device_array, - struct ArrowArray* array, void* sync_event); + struct ArrowArray* array, void* sync_event, void* stream); /// \brief Move an ArrowDeviceArray between devices without copying buffers /// /// Some devices can move an ArrowDeviceArray without an explicit buffer copy, /// although the performance characteristics of the moved array may be different - /// than that of an explicitly copied one depending on the device. + /// than that of an explicitly copied one depending on the device. Implementations must + /// check device_src and device_dst and return ENOTSUP if not prepared to handle this + /// operation. ArrowErrorCode (*array_move)(struct ArrowDevice* device_src, struct ArrowDeviceArray* src, struct ArrowDevice* device_dst, @@ -203,38 +213,45 @@ struct ArrowDevice { /// \brief Initialize an owning buffer from existing content /// /// Creates a new buffer whose data member can be accessed by the GPU by - /// copying existing content. + /// copying existing content. Implementations must use the provided stream + /// if non-null; implementations may error if they require a stream to be provided. /// Implementations must check device_src and device_dst and return ENOTSUP if /// not prepared to handle this operation. ArrowErrorCode (*buffer_init)(struct ArrowDevice* device_src, struct ArrowBufferView src, - struct ArrowDevice* device_dst, struct ArrowBuffer* dst); + struct ArrowDevice* device_dst, struct ArrowBuffer* dst, + void* stream); /// \brief Move an owning buffer to a device /// /// Creates a new buffer whose data member can be accessed by the GPU by /// moving an existing buffer. If NANOARROW_OK is returned, src will have /// been released or moved by the implementation and dst must be released by - /// the caller. - /// Implementations must check device_src and device_dst and return ENOTSUP if - /// not prepared to handle this operation. + /// the caller. Implementations must check device_src and device_dst and return ENOTSUP + /// if not prepared to handle this operation. ArrowErrorCode (*buffer_move)(struct ArrowDevice* device_src, struct ArrowBuffer* src, struct ArrowDevice* device_dst, struct ArrowBuffer* dst); /// \brief Copy a section of memory into a preallocated buffer /// /// As opposed to the other buffer operations, this is designed to support - /// copying very small slices of memory. + /// copying very small slices of memory. Implementations must use the provided stream + /// if non-null; implementations may error if they require a stream to be provided. /// Implementations must check device_src and device_dst and return ENOTSUP if /// not prepared to handle this operation. ArrowErrorCode (*buffer_copy)(struct ArrowDevice* device_src, struct ArrowBufferView src, struct ArrowDevice* device_dst, - struct ArrowBufferView dst); + struct ArrowBufferView dst, void* stream); - /// \brief Wait for an event on the CPU host + /// \brief Synchronize an event and/or stream + /// + /// If both sync_event and stream are non-null, ensures that the stream waits + /// on the event. If only sync_event is non-null, ensures that the work captured + /// by the event is synchronized with the CPU. If only stream is non-null, ensures + /// that stream is synchronized with the CPU. ArrowErrorCode (*synchronize_event)(struct ArrowDevice* device, void* sync_event, - struct ArrowError* error); + void* stream, struct ArrowError* error); /// \brief Release this device and any resources it holds void (*release)(struct ArrowDevice* device); @@ -243,18 +260,50 @@ struct ArrowDevice { void* private_data; }; -struct ArrowDeviceArrayView { - struct ArrowDevice* device; - struct ArrowArrayView array_view; -}; +/// \brief Pointer to a statically-allocated CPU device singleton +struct ArrowDevice* ArrowDeviceCpu(void); + +/// \brief Initialize a user-allocated device struct with a CPU device +void ArrowDeviceInitCpu(struct ArrowDevice* device); + +/// \brief Resolve a device pointer from a type + identifier +/// +/// Depending on which libraries this build of the device extension was built with, +/// some device types may or may not be supported. The CPU type is always supported. +/// Returns NULL for device that does not exist or cannot be returned as a singleton. +/// Callers must not release the pointed-to device. +struct ArrowDevice* ArrowDeviceResolve(ArrowDeviceType device_type, int64_t device_id); /// \brief Initialize an ArrowDeviceArray /// /// Given an ArrowArray whose buffers/release callback has been set appropriately, -/// initialize an ArrowDeviceArray. -ArrowErrorCode ArrowDeviceArrayInit(struct ArrowDevice* device, - struct ArrowDeviceArray* device_array, - struct ArrowArray* array, void* sync_event); +/// initialize an ArrowDeviceArray. If sync_event is non-null, ownership is transferred +/// to the output array. If stream is non-null, the event must be recorded such that +/// it captures the work done on stream. If NANOARROW_OK is returned, ownership of array +/// and sync_event is transferred to device_array. The caller retains ownership of +/// stream. +ArrowErrorCode ArrowDeviceArrayInitAsync(struct ArrowDevice* device, + struct ArrowDeviceArray* device_array, + struct ArrowArray* array, void* sync_event, + void* stream); + +/// \brief Initialize an ArrowDeviceArray without a stream +/// +/// Convenience wrapper to initialize an ArrowDeviceArray without a stream. +static inline ArrowErrorCode ArrowDeviceArrayInit(struct ArrowDevice* device, + struct ArrowDeviceArray* device_array, + struct ArrowArray* array, + void* sync_event); + +/// \brief Initialize an ArrowDeviceArrayStream from an existing ArrowArrayStream +/// +/// Wrap an ArrowArrayStream of ArrowDeviceArray objects already allocated by the +/// specified device as an ArrowDeviceArrayStream. This function moves the ownership +/// of array_stream to the device_array_stream. If this function returns NANOARROW_OK, +/// the caller is responsible for releasing the ArrowDeviceArrayStream. +ArrowErrorCode ArrowDeviceBasicArrayStreamInit( + struct ArrowDeviceArrayStream* device_array_stream, + struct ArrowArrayStream* array_stream, struct ArrowDevice* device); /// \brief Initialize an ArrowDeviceArrayView /// @@ -277,71 +326,94 @@ ArrowErrorCode ArrowDeviceArrayViewSetArrayMinimal( /// \brief Set ArrowArrayView buffer information from a device array /// /// Runs ArrowDeviceArrayViewSetArrayMinimal() but also sets buffer sizes for -/// variable-length buffers by copying data from the device. This function will block on -/// the device_array's sync_event. -ArrowErrorCode ArrowDeviceArrayViewSetArray( +/// variable-length buffers by copying data from the device if needed. If stream +/// is provided it will be used to do any copying required to resolve buffer sizes. +ArrowErrorCode ArrowDeviceArrayViewSetArrayAsync( + struct ArrowDeviceArrayView* device_array_view, struct ArrowDeviceArray* device_array, + void* stream, struct ArrowError* error); + +/// \brief Set ArrowArrayView buffer information from a device array without a stream +/// +/// Convenience wrapper for the case where no stream is provided. +static inline ArrowErrorCode ArrowDeviceArrayViewSetArray( struct ArrowDeviceArrayView* device_array_view, struct ArrowDeviceArray* device_array, struct ArrowError* error); /// \brief Copy an ArrowDeviceArrayView to a device -ArrowErrorCode ArrowDeviceArrayViewCopy(struct ArrowDeviceArrayView* src, - struct ArrowDevice* device_dst, - struct ArrowDeviceArray* dst); +/// +/// If stream is provided, it will be used to launch copies asynchronously. +/// Note that this implies that all pointers in src will remain valid until +/// the stream is synchronized. +ArrowErrorCode ArrowDeviceArrayViewCopyAsync(struct ArrowDeviceArrayView* src, + struct ArrowDevice* device_dst, + struct ArrowDeviceArray* dst, void* stream); + +/// \brief Copy an ArrowDeviceArrayView to a device without a stream +/// +/// Convenience wrapper for the case where no stream is provided. +static inline ArrowErrorCode ArrowDeviceArrayViewCopy(struct ArrowDeviceArrayView* src, + struct ArrowDevice* device_dst, + struct ArrowDeviceArray* dst); /// \brief Move an ArrowDeviceArray to a device if possible /// /// Will attempt to move a device array to a device without copying buffers. /// This may result in a device array with different performance charateristics -/// than an array that was copied. +/// than an array that was copied. Returns ENOTSUP if a zero-copy move between devices is +/// not possible. ArrowErrorCode ArrowDeviceArrayMoveToDevice(struct ArrowDeviceArray* src, struct ArrowDevice* device_dst, struct ArrowDeviceArray* dst); -/// \brief Pointer to a statically-allocated CPU device singleton -struct ArrowDevice* ArrowDeviceCpu(void); - -/// \brief Initialize a user-allocated device struct with a CPU device -void ArrowDeviceInitCpu(struct ArrowDevice* device); - -/// \brief Resolve a device pointer from a type + identifier +/// \brief Allocate a device buffer and copying existing content /// -/// Depending on which libraries this build of the device extension was built with, -/// some device types may or may not be supported. The CPU type is always supported. -/// Returns NULL for device that does not exist or cannot be returned as a singleton. -/// Callers must not release the pointed-to device. -struct ArrowDevice* ArrowDeviceResolve(ArrowDeviceType device_type, int64_t device_id); - -ArrowErrorCode ArrowDeviceBufferInit(struct ArrowDevice* device_src, - struct ArrowBufferView src, - struct ArrowDevice* device_dst, - struct ArrowBuffer* dst); +/// If stream is provided, it will be used to launch copies asynchronously. +/// Note that this implies that src will remain valid until the stream is +/// synchronized. +ArrowErrorCode ArrowDeviceBufferInitAsync(struct ArrowDevice* device_src, + struct ArrowBufferView src, + struct ArrowDevice* device_dst, + struct ArrowBuffer* dst, void* stream); + +/// \brief Allocate a device buffer and copying existing content without a stream +/// +/// Convenience wrapper for the case where no stream is provided. +static inline ArrowErrorCode ArrowDeviceBufferInit(struct ArrowDevice* device_src, + struct ArrowBufferView src, + struct ArrowDevice* device_dst, + struct ArrowBuffer* dst); +/// \brief Move a buffer to a device without copying if possible +/// +/// Returns ENOTSUP if a zero-copy move between devices is not possible. ArrowErrorCode ArrowDeviceBufferMove(struct ArrowDevice* device_src, struct ArrowBuffer* src, struct ArrowDevice* device_dst, struct ArrowBuffer* dst); -ArrowErrorCode ArrowDeviceBufferCopy(struct ArrowDevice* device_src, - struct ArrowBufferView src, - struct ArrowDevice* device_dst, - struct ArrowBufferView dst); - -/// \brief Initialize an ArrowDeviceArrayStream from an existing ArrowArrayStream +/// \brief Copy a buffer into preallocated device memory /// -/// Wrap an ArrowArrayStream of ArrowDeviceArray objects already allocated by the -/// specified device as an ArrowDeviceArrayStream. This function moves the ownership of -/// array_stream to the device_array_stream. If this function returns NANOARROW_OK, the -/// caller is responsible for releasing the ArrowDeviceArrayStream. -ArrowErrorCode ArrowDeviceBasicArrayStreamInit( - struct ArrowDeviceArrayStream* device_array_stream, - struct ArrowArrayStream* array_stream, struct ArrowDevice* device); +/// If stream is provided, it will be used to launch copies asynchronously. +/// Note that this implies that src will remain valid until the stream is +/// synchronized. +ArrowErrorCode ArrowDeviceBufferCopyAsync(struct ArrowDevice* device_src, + struct ArrowBufferView src, + struct ArrowDevice* device_dst, + struct ArrowBufferView dst, void* stream); + +/// \brief Copy a buffer into preallocated devie memory +/// +/// Returns ENOTSUP if a zero-copy move between devices is not possible. +static inline ArrowErrorCode ArrowDeviceBufferCopy(struct ArrowDevice* device_src, + struct ArrowBufferView src, + struct ArrowDevice* device_dst, + struct ArrowBufferView dst); /// @} /// \defgroup nanoarrow_device_cuda CUDA Device extension /// -/// A CUDA (i.e., `cuda_runtime_api.h`) implementation of the Arrow C Device -/// interface. +/// A CUDA (i.e., `cuda.h`) implementation of the Arrow C Device interface. /// /// @{ @@ -394,6 +466,41 @@ ArrowErrorCode ArrowDeviceMetalAlignArrayBuffers(struct ArrowArray* array); /// @} +// Inline implementations + +static inline ArrowErrorCode ArrowDeviceBufferCopy(struct ArrowDevice* device_src, + struct ArrowBufferView src, + struct ArrowDevice* device_dst, + struct ArrowBufferView dst) { + return ArrowDeviceBufferCopyAsync(device_src, src, device_dst, dst, NULL); +} + +static inline ArrowErrorCode ArrowDeviceBufferInit(struct ArrowDevice* device_src, + struct ArrowBufferView src, + struct ArrowDevice* device_dst, + struct ArrowBuffer* dst) { + return ArrowDeviceBufferInitAsync(device_src, src, device_dst, dst, NULL); +} + +static inline ArrowErrorCode ArrowDeviceArrayViewCopy(struct ArrowDeviceArrayView* src, + struct ArrowDevice* device_dst, + struct ArrowDeviceArray* dst) { + return ArrowDeviceArrayViewCopyAsync(src, device_dst, dst, NULL); +} + +static inline ArrowErrorCode ArrowDeviceArrayViewSetArray( + struct ArrowDeviceArrayView* device_array_view, struct ArrowDeviceArray* device_array, + struct ArrowError* error) { + return ArrowDeviceArrayViewSetArrayAsync(device_array_view, device_array, NULL, error); +} + +static inline ArrowErrorCode ArrowDeviceArrayInit(struct ArrowDevice* device, + struct ArrowDeviceArray* device_array, + struct ArrowArray* array, + void* sync_event) { + return ArrowDeviceArrayInitAsync(device, device_array, array, sync_event, NULL); +} + #ifdef __cplusplus } #endif