Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Oct 11, 2023
1 parent 0793432 commit 33aa54e
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 32 deletions.
15 changes: 8 additions & 7 deletions ci/scripts/integration_arrow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,23 @@ arrow_dir=${1}
gold_dir=$arrow_dir/testing/data/arrow-ipc-stream/integration

pip install -e $arrow_dir/dev/archery[integration]
# For C# C Data Interface testing
pip install pythonnet
# For C Data Interface testing
pip install jpype1 pythonnet

# Get more detailed context on crashes
export PYTHONFAULTHANDLER=1

# --run-ipc \
# --run-flight \

# Rust can be enabled by exporting ARCHERY_INTEGRATION_WITH_RUST=1
time archery integration \
--run-c-data \
--run-ipc \
--run-flight \
--with-cpp=1 \
--with-csharp=1 \
--with-csharp=0 \
--with-java=1 \
--with-js=1 \
--with-go=1 \
--with-js=0 \
--with-go=0 \
--gold-dirs=$gold_dir/0.14.1 \
--gold-dirs=$gold_dir/0.17.1 \
--gold-dirs=$gold_dir/1.0.0-bigendian \
Expand Down
23 changes: 11 additions & 12 deletions dev/archery/archery/integration/datagen.py
Original file line number Diff line number Diff line change
Expand Up @@ -1700,9 +1700,9 @@ def generate_unions_case():


def generate_dictionary_case():
dict0 = Dictionary(0, StringField('dictionary1'), size=10, name='DICT0')
dict1 = Dictionary(1, StringField('dictionary1'), size=5, name='DICT1')
dict2 = Dictionary(2, get_field('dictionary2', 'int64'),
dict0 = Dictionary(1, StringField('dictionary1'), size=10, name='DICT0')
dict1 = Dictionary(2, StringField('dictionary1'), size=5, name='DICT1')
dict2 = Dictionary(3, get_field('dictionary2', 'int64'),
size=50, name='DICT2')

fields = [
Expand All @@ -1716,14 +1716,13 @@ def generate_dictionary_case():


def generate_dictionary_unsigned_case():
dict0 = Dictionary(0, StringField('dictionary0'), size=5, name='DICT0')
dict1 = Dictionary(1, StringField('dictionary1'), size=5, name='DICT1')
dict2 = Dictionary(2, StringField('dictionary2'), size=5, name='DICT2')
dict0 = Dictionary(1, StringField('dictionary0'), size=5, name='DICT0')
dict1 = Dictionary(2, StringField('dictionary1'), size=5, name='DICT1')
dict2 = Dictionary(3, StringField('dictionary2'), size=5, name='DICT2')

# TODO: JavaScript does not support uint64 dictionary indices, so disabled
# for now

# dict3 = Dictionary(3, StringField('dictionary3'), size=5, name='DICT3')
# dict3 = Dictionary(4, StringField('dictionary3'), size=5, name='DICT3')
fields = [
DictionaryField('f0', get_field('', 'uint8'), dict0),
DictionaryField('f1', get_field('', 'uint16'), dict1),
Expand All @@ -1736,18 +1735,18 @@ def generate_dictionary_unsigned_case():


def generate_nested_dictionary_case():
dict0 = Dictionary(0, StringField('str'), size=10, name='DICT0')
dict0 = Dictionary(1, StringField('str'), size=10, name='DICT0')

list_of_dict = ListField(
'list',
DictionaryField('str_dict', get_field('', 'int8'), dict0))
dict1 = Dictionary(1, list_of_dict, size=30, name='DICT1')
dict1 = Dictionary(2, list_of_dict, size=30, name='DICT1')

struct_of_dict = StructField('struct', [
DictionaryField('str_dict_a', get_field('', 'int8'), dict0),
DictionaryField('str_dict_b', get_field('', 'int8'), dict0)
])
dict2 = Dictionary(2, struct_of_dict, size=30, name='DICT2')
dict2 = Dictionary(3, struct_of_dict, size=30, name='DICT2')

fields = [
DictionaryField('list_dict', get_field('', 'int8'), dict1),
Expand All @@ -1760,7 +1759,7 @@ def generate_nested_dictionary_case():


def generate_extension_case():
dict0 = Dictionary(0, StringField('dictionary0'), size=5, name='DICT0')
dict0 = Dictionary(1, StringField('dictionary0'), size=5, name='DICT0')

uuid_type = ExtensionType('uuid', 'uuid-serialized',
FixedSizeBinaryField('', 16))
Expand Down
171 changes: 163 additions & 8 deletions dev/archery/archery/integration/tester_java.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
# under the License.

import contextlib
import functools
import os
import subprocess

from .tester import Tester
from . import cdata
from .tester import Tester, CDataExporter, CDataImporter
from .util import run_cmd, log
from ..utils.source import ARROW_ROOT_DEFAULT

Expand All @@ -42,18 +44,25 @@ def load_version_from_pom():
"ARROW_JAVA_INTEGRATION_JAR",
os.path.join(
ARROW_ROOT_DEFAULT,
"java/tools/target/arrow-tools-{}-"
"jar-with-dependencies.jar".format(_arrow_version),
),
"java/tools/target",
f"arrow-tools-{_arrow_version}-jar-with-dependencies.jar"
)
)
_ARROW_C_DATA_JAR = os.environ.get(
"ARROW_C_DATA_JAVA_INTEGRATION_JAR",
os.path.join(
ARROW_ROOT_DEFAULT,
"java/c/target",
f"arrow-c-data-{_arrow_version}.jar"
)
)
_ARROW_FLIGHT_JAR = os.environ.get(
"ARROW_FLIGHT_JAVA_INTEGRATION_JAR",
os.path.join(
ARROW_ROOT_DEFAULT,
"java/flight/flight-integration-tests/target/"
"flight-integration-tests-{}-jar-with-dependencies.jar".format(
_arrow_version),
),
"java/flight/flight-integration-tests/target",
f"flight-integration-tests-{_arrow_version}-jar-with-dependencies.jar"
)
)
_ARROW_FLIGHT_SERVER = (
"org.apache.arrow.flight.integration.tests.IntegrationTestServer"
Expand All @@ -63,11 +72,151 @@ def load_version_from_pom():
)


@functools.lru_cache
def setup_jpype():
import jpype
jar_path = f"{_ARROW_TOOLS_JAR}:{_ARROW_C_DATA_JAR}"
# XXX Didn't manage to tone down the logging level here (DEBUG -> INFO)
jpype.startJVM(jpype.getDefaultJVMPath(),
"-Djava.class.path=" + jar_path)


class _CDataBase:

def __init__(self, debug, args):
import jpype
self.debug = debug
self.args = args
self.ffi = cdata.ffi()
setup_jpype()
# JPype pointers to java.io, org.apache.arrow...
self.java_io = jpype.JPackage("java").io
self.java_arrow = jpype.JPackage("org").apache.arrow
self.java_allocator = self._make_java_allocator()

def _pointer_to_int(self, c_ptr):
return int(self.ffi.cast('uintptr_t', c_ptr))

def _wrap_c_schema_ptr(self, c_schema_ptr):
return self.java_arrow.c.ArrowSchema.wrap(
self._pointer_to_int(c_schema_ptr))

def _wrap_c_array_ptr(self, c_array_ptr):
return self.java_arrow.c.ArrowArray.wrap(
self._pointer_to_int(c_array_ptr))

def _make_java_allocator(self):
# Return a new allocator
return self.java_arrow.memory.RootAllocator()

def _assert_schemas_equal(self, expected, actual):
# XXX This is fragile for dictionaries, as Schema.equals compares
# dictionary ids!
# Should perhaps instead add a logical comparison function in
# org.apache.arrow.vector.util.DictionaryUtil
if not expected.equals(actual):
raise AssertionError(
f"Java Schemas are not equal:\n"
f"* expected = {expected.toString()}\n"
f"* actual = {actual.toString()}")


class JavaCDataExporter(CDataExporter, _CDataBase):

def export_schema_from_json(self, json_path, c_schema_ptr):
json_file = self.java_io.File(json_path)
with self.java_arrow.vector.ipc.JsonFileReader(
json_file, self.java_allocator) as json_reader:
schema = json_reader.start()
dict_provider = json_reader
self.java_arrow.c.Data.exportSchema(
self.java_allocator, schema, dict_provider,
self._wrap_c_schema_ptr(c_schema_ptr)
)

def export_batch_from_json(self, json_path, num_batch, c_array_ptr):
json_file = self.java_io.File(json_path)
with self.java_arrow.vector.ipc.JsonFileReader(
json_file, self.java_allocator) as json_reader:
json_reader.start()
if num_batch > 0:
actually_skipped = json_reader.skip(num_batch)
assert actually_skipped == num_batch
with json_reader.read() as batch:
dict_provider = json_reader
self.java_arrow.c.Data.exportVectorSchemaRoot(
self.java_allocator, batch, dict_provider,
self._wrap_c_array_ptr(c_array_ptr))

@property
def supports_releasing_memory(self):
return True

def record_allocation_state(self):
return self.java_allocator.getAllocatedMemory()

def compare_allocation_state(self, recorded, gc_until):
def pred():
return self.java_allocator.getAllocatedMemory() == recorded

return gc_until(pred)


class JavaCDataImporter(CDataImporter, _CDataBase):

def import_schema_and_compare_to_json(self, json_path, c_schema_ptr):
json_file = self.java_io.File(json_path)
with self.java_arrow.vector.ipc.JsonFileReader(
json_file, self.java_allocator) as json_reader:
json_schema = json_reader.start()
with self.java_arrow.c.CDataDictionaryProvider() as dict_provider:
imported_schema = self.java_arrow.c.Data.importSchema(
self.java_allocator,
self._wrap_c_schema_ptr(c_schema_ptr),
dict_provider)
self._assert_schemas_equal(json_schema, imported_schema)

def import_batch_and_compare_to_json(self, json_path, num_batch,
c_array_ptr):
json_file = self.java_io.File(json_path)
with self.java_arrow.vector.ipc.JsonFileReader(
json_file, self.java_allocator) as json_reader:
schema = json_reader.start()
if num_batch > 0:
actually_skipped = json_reader.skip(num_batch)
assert actually_skipped == num_batch
with (json_reader.read() as batch,
self.java_arrow.vector.VectorSchemaRoot.create(
schema, self.java_allocator) as imported_batch):
# We need to pass a dict provider primed with dictionary ids
# matching those in the schema, hence an empty
# CDataDictionaryProvider would not work here!
dict_provider = json_reader
self.java_arrow.c.Data.importIntoVectorSchemaRoot(
self.java_allocator,
self._wrap_c_array_ptr(c_array_ptr),
imported_batch, dict_provider)
# TODO print nice error message if not equal
assert imported_batch.equals(batch)

@property
def supports_releasing_memory(self):
return True

def gc_until(self, predicate):
# No need to call the Java GC thanks to AutoCloseable (?)
return predicate()


class JavaTester(Tester):
PRODUCER = True
CONSUMER = True
FLIGHT_SERVER = True
FLIGHT_CLIENT = True
C_DATA_SCHEMA_EXPORTER = True
C_DATA_SCHEMA_IMPORTER = True
C_DATA_ARRAY_EXPORTER = True
C_DATA_ARRAY_IMPORTER = True

name = 'Java'

Expand Down Expand Up @@ -186,3 +335,9 @@ def flight_server(self, scenario_name=None):
finally:
server.kill()
server.wait(5)

def make_c_data_exporter(self):
return JavaCDataExporter(self.debug, self.args)

def make_c_data_importer(self):
return JavaCDataImporter(self.debug, self.args)
11 changes: 8 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1730,16 +1730,21 @@ services:
volumes: *conda-volumes
environment:
<<: [*common, *ccache]
# tell archery where the arrow binaries are located
ARCHERY_INTEGRATION_WITH_RUST: 0
# Tell Archery where the arrow C++ binaries are located
ARROW_CPP_EXE_PATH: /build/cpp/debug
ARROW_GO_INTEGRATION: 1
ARCHERY_INTEGRATION_WITH_RUST: 0
ARROW_JAVA_CDATA: "ON"
JAVA_JNI_CMAKE_ARGS: >-
-DARROW_JAVA_JNI_ENABLE_DEFAULT=OFF
-DARROW_JAVA_JNI_ENABLE_C=ON
command:
["/arrow/ci/scripts/rust_build.sh /arrow /build &&
/arrow/ci/scripts/cpp_build.sh /arrow /build &&
/arrow/ci/scripts/csharp_build.sh /arrow /build &&
/arrow/ci/scripts/go_build.sh /arrow &&
/arrow/ci/scripts/java_build.sh /arrow /build &&
/arrow/ci/scripts/java_jni_build.sh /arrow $${ARROW_HOME} /build /tmp/dist/java/$$(arch) &&
/arrow/ci/scripts/java_build.sh /arrow /build /tmp/dist/java &&
/arrow/ci/scripts/js_build.sh /arrow /build &&
/arrow/ci/scripts/integration_arrow.sh /arrow /build"]

Expand Down
4 changes: 4 additions & 0 deletions java/c/src/main/java/org/apache/arrow/c/Format.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ static String asString(ArrowType arrowType) {
return "tiD";
case YEAR_MONTH:
return "tiM";
case MONTH_DAY_NANO:
return "tin";
default:
throw new UnsupportedOperationException(
String.format("Interval type with unit %s is unsupported", type.getUnit()));
Expand Down Expand Up @@ -277,6 +279,8 @@ static ArrowType asType(String format, long flags)
return new ArrowType.Interval(IntervalUnit.YEAR_MONTH);
case "tiD":
return new ArrowType.Interval(IntervalUnit.DAY_TIME);
case "tin":
return new ArrowType.Interval(IntervalUnit.MONTH_DAY_NANO);
case "+l":
return new ArrowType.List();
case "+L":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,27 @@ public VectorSchemaRoot read() throws IOException {
}
}

/**
* Skips a number of record batches in the file.
*
* @param numBatches the number of batches to skip
* @return the actual number of skipped batches.
*/
public int skip(int numBatches) throws IOException {
for (int i = 0; i < numBatches; ++i) {
JsonToken t = parser.nextToken();
if (t == START_OBJECT) {
parser.skipChildren();
assert parser.getCurrentToken() == END_OBJECT;
} else if (t == END_ARRAY) {
return i;
} else {
throw new IllegalArgumentException("Invalid token: " + t);
}
}
return numBatches;
}

private abstract class BufferReader {
protected abstract ArrowBuf read(BufferAllocator allocator, int count) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ public ArrowRecordBatch(
}
long size = arrowBuf.readableBytes();
arrowBuffers.add(new ArrowBuffer(offset, size));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Buffer in RecordBatch at {}, length: {}", offset, size);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Buffer in RecordBatch at {}, length: {}", offset, size);
}
offset += size;
if (alignBuffers) { // align on 8 byte boundaries
Expand Down

0 comments on commit 33aa54e

Please sign in to comment.