From f0ce84552791eb0da34be95d241f978424d2c6da Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 15 Nov 2023 23:57:57 +0100 Subject: [PATCH] GH-38798: [Integration] Enable C Data Interface integration testing on Rust --- ci/scripts/integration_arrow.sh | 1 + ci/scripts/rust_build.sh | 3 +- dev/archery/archery/integration/cdata.py | 9 ++ dev/archery/archery/integration/tester_cpp.py | 10 +- dev/archery/archery/integration/tester_go.py | 10 +- .../archery/integration/tester_rust.py | 114 +++++++++++++++++- docker-compose.yml | 3 +- 7 files changed, 128 insertions(+), 22 deletions(-) diff --git a/ci/scripts/integration_arrow.sh b/ci/scripts/integration_arrow.sh index b5a38f01412d4..6d1d4befa6964 100755 --- a/ci/scripts/integration_arrow.sh +++ b/ci/scripts/integration_arrow.sh @@ -43,6 +43,7 @@ fi # Get more detailed context on crashes export PYTHONFAULTHANDLER=1 + # Rust can be enabled by exporting ARCHERY_INTEGRATION_WITH_RUST=1 time archery integration \ --run-c-data \ diff --git a/ci/scripts/rust_build.sh b/ci/scripts/rust_build.sh index 2dfc0f1b1892d..5fc21d454b080 100755 --- a/ci/scripts/rust_build.sh +++ b/ci/scripts/rust_build.sh @@ -21,6 +21,7 @@ set -e arrow_dir=${1} source_dir=${1}/rust +build_dir=${2}/rust # This file is used to build the rust binaries needed for the archery # integration tests. Testing of the rust implementation in normal CI is handled @@ -54,7 +55,7 @@ rustup show pushd ${source_dir} # build only the integration testing binaries -cargo build -p arrow-integration-testing +cargo build -p arrow-integration-testing --target-dir ${build_dir} # Save disk space by removing large temporary build products rm -rf target/debug/deps diff --git a/dev/archery/archery/integration/cdata.py b/dev/archery/archery/integration/cdata.py index 8e5550fcdb9c5..a5dbbe29d8aba 100644 --- a/dev/archery/archery/integration/cdata.py +++ b/dev/archery/archery/integration/cdata.py @@ -18,10 +18,19 @@ import cffi from contextlib import contextmanager import functools +import os +import sys from .tester import CDataExporter, CDataImporter +if sys.platform == "darwin": + dll_suffix = ".dylib" +elif os.name == "nt": + dll_suffix = ".dll" +else: + dll_suffix = ".so" + _c_data_decls = """ struct ArrowSchema { // Array type description diff --git a/dev/archery/archery/integration/tester_cpp.py b/dev/archery/archery/integration/tester_cpp.py index 658e71330155e..02c110c0e20a4 100644 --- a/dev/archery/archery/integration/tester_cpp.py +++ b/dev/archery/archery/integration/tester_cpp.py @@ -18,7 +18,6 @@ import contextlib import functools import os -import sys import subprocess from . import cdata @@ -42,15 +41,8 @@ "localhost", ] -if sys.platform == "darwin": - _dll_suffix = ".dylib" -elif os.name == "nt": - _dll_suffix = ".dll" -else: - _dll_suffix = ".so" - _DLL_PATH = _EXE_PATH -_ARROW_DLL = os.path.join(_DLL_PATH, "libarrow" + _dll_suffix) +_ARROW_DLL = os.path.join(_DLL_PATH, "libarrow" + cdata.dll_suffix) class CppTester(Tester): diff --git a/dev/archery/archery/integration/tester_go.py b/dev/archery/archery/integration/tester_go.py index 2b3dc3a1be336..5368f06a318e5 100644 --- a/dev/archery/archery/integration/tester_go.py +++ b/dev/archery/archery/integration/tester_go.py @@ -18,7 +18,6 @@ import contextlib import functools import os -import sys import subprocess from . import cdata @@ -43,17 +42,10 @@ "localhost", ] -if sys.platform == "darwin": - _dll_suffix = ".dylib" -elif os.name == "nt": - _dll_suffix = ".dll" -else: - _dll_suffix = ".so" - _DLL_PATH = os.path.join( ARROW_ROOT_DEFAULT, "go/arrow/internal/cdata_integration") -_INTEGRATION_DLL = os.path.join(_DLL_PATH, "arrow_go_integration" + _dll_suffix) +_INTEGRATION_DLL = os.path.join(_DLL_PATH, "arrow_go_integration" + cdata.dll_suffix) class GoTester(Tester): diff --git a/dev/archery/archery/integration/tester_rust.py b/dev/archery/archery/integration/tester_rust.py index c7a94de2197bd..c3303342df6ca 100644 --- a/dev/archery/archery/integration/tester_rust.py +++ b/dev/archery/archery/integration/tester_rust.py @@ -16,15 +16,19 @@ # under the License. import contextlib +import functools import os import subprocess -from .tester import Tester +from . import cdata +from .tester import Tester, CDataExporter, CDataImporter from .util import run_cmd, log from ..utils.source import ARROW_ROOT_DEFAULT -_EXE_PATH = os.path.join(ARROW_ROOT_DEFAULT, "rust/target/debug") +_EXE_PATH = os.environ.get( + "ARROW_RUST_EXE_PATH", os.path.join(ARROW_ROOT_DEFAULT, "rust/target/debug") +) _INTEGRATION_EXE = os.path.join(_EXE_PATH, "arrow-json-integration-test") _STREAM_TO_FILE = os.path.join(_EXE_PATH, "arrow-stream-to-file") _FILE_TO_STREAM = os.path.join(_EXE_PATH, "arrow-file-to-stream") @@ -37,12 +41,19 @@ "localhost", ] +_INTEGRATION_DLL = os.path.join(_EXE_PATH, + "libarrow_integration_testing" + cdata.dll_suffix) + class RustTester(Tester): PRODUCER = True CONSUMER = True FLIGHT_SERVER = True FLIGHT_CLIENT = True + C_DATA_SCHEMA_EXPORTER = True + C_DATA_ARRAY_EXPORTER = True + C_DATA_SCHEMA_IMPORTER = True + C_DATA_ARRAY_IMPORTER = True name = 'Rust' @@ -117,3 +128,102 @@ def flight_request(self, port, json_path=None, scenario_name=None): if self.debug: log(' '.join(cmd)) run_cmd(cmd) + + def make_c_data_exporter(self): + return RustCDataExporter(self.debug, self.args) + + def make_c_data_importer(self): + return RustCDataImporter(self.debug, self.args) + + +_rust_c_data_entrypoints = """ + const char* arrow_rs_cdata_integration_export_schema_from_json( + const char* json_path, uintptr_t out); + const char* arrow_rs_cdata_integration_import_schema_and_compare_to_json( + const char* json_path, uintptr_t c_schema); + + const char* arrow_rs_cdata_integration_export_batch_from_json( + const char* json_path, int num_batch, uintptr_t out); + const char* arrow_rs_cdata_integration_import_batch_and_compare_to_json( + const char* json_path, int num_batch, uintptr_t c_array); + + void arrow_rs_free_error(const char*); + """ + + +@functools.lru_cache +def _load_ffi(ffi, lib_path=_INTEGRATION_DLL): + ffi.cdef(_rust_c_data_entrypoints) + dll = ffi.dlopen(lib_path) + return dll + + +class _CDataBase: + + def __init__(self, debug, args): + self.debug = debug + self.args = args + self.ffi = cdata.ffi() + self.dll = _load_ffi(self.ffi) + + def _pointer_to_int(self, c_ptr): + return self.ffi.cast('uintptr_t', c_ptr) + + def _check_rust_error(self, rs_error): + """ + Check a `const char*` error return from an integration entrypoint. + + A null means success, a non-empty string is an error message. + The string is dynamically allocated on the Rust side. + """ + assert self.ffi.typeof(rs_error) is self.ffi.typeof("const char*") + if rs_error != self.ffi.NULL: + try: + error = self.ffi.string(rs_error).decode( + 'utf8', errors='replace') + raise RuntimeError( + f"Rust C Data Integration call failed: {error}") + finally: + self.dll.arrow_rs_free_error(rs_error) + + +class RustCDataExporter(CDataExporter, _CDataBase): + + def export_schema_from_json(self, json_path, c_schema_ptr): + rs_error = self.dll.arrow_rs_cdata_integration_export_schema_from_json( + str(json_path).encode(), self._pointer_to_int(c_schema_ptr)) + self._check_rust_error(rs_error) + + def export_batch_from_json(self, json_path, num_batch, c_array_ptr): + rs_error = self.dll.arrow_rs_cdata_integration_export_batch_from_json( + str(json_path).encode(), num_batch, + self._pointer_to_int(c_array_ptr)) + self._check_rust_error(rs_error) + + @property + def supports_releasing_memory(self): + return True + + def record_allocation_state(self): + # FIXME is it possible to measure the amount of Rust-allocated memory? + return 0 + + +class RustCDataImporter(CDataImporter, _CDataBase): + + def import_schema_and_compare_to_json(self, json_path, c_schema_ptr): + rs_error = \ + self.dll.arrow_rs_cdata_integration_import_schema_and_compare_to_json( + str(json_path).encode(), self._pointer_to_int(c_schema_ptr)) + self._check_rust_error(rs_error) + + def import_batch_and_compare_to_json(self, json_path, num_batch, + c_array_ptr): + rs_error = \ + self.dll.arrow_rs_cdata_integration_import_batch_and_compare_to_json( + str(json_path).encode(), num_batch, self._pointer_to_int(c_array_ptr)) + self._check_rust_error(rs_error) + + @property + def supports_releasing_memory(self): + return True diff --git a/docker-compose.yml b/docker-compose.yml index e2c993ee9ea41..8cc05903e9d5b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1716,8 +1716,9 @@ services: environment: <<: [*common, *ccache] ARCHERY_INTEGRATION_WITH_RUST: 0 - # Tell Archery where the arrow C++ binaries are located + # Tell Archery where Arrow binaries are located ARROW_CPP_EXE_PATH: /build/cpp/debug + ARROW_RUST_EXE_PATH: /build/rust/debug command: ["/arrow/ci/scripts/integration_arrow_build.sh /arrow /build && /arrow/ci/scripts/integration_arrow.sh /arrow /build"]