Skip to content

Commit

Permalink
More Arrow-centric testing
Browse files Browse the repository at this point in the history
  • Loading branch information
jmao-denver committed Dec 29, 2022
1 parent bff6733 commit 11cf4bc
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 101 deletions.
99 changes: 53 additions & 46 deletions py/server/deephaven/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#
"""This module supports conversions between Arrow tables and Deephaven tables."""
import typing
from typing import List

from typing import List, Dict

import jpy
import pyarrow as pa
Expand All @@ -14,56 +14,63 @@
_JArrowToTableConverter = jpy.get_type("io.deephaven.extensions.barrage.util.ArrowToTableConverter")
_JTableToArrowConverter = jpy.get_type("io.deephaven.extensions.barrage.util.TableToArrowConverter")


def _map_arrow_type(arrow_type) -> typing.Dict[str, str]:
_ARROW_DH_DATA_TYPE_MAPPING = {
pa.null(): '',
pa.bool_(): '',
pa.int8(): 'byte',
pa.int16(): 'short',
pa.int32(): 'int',
pa.int64(): 'long',
pa.uint8(): '',
pa.uint16(): 'char',
pa.uint32(): '',
pa.uint64(): '',
pa.float16(): '',
pa.float32(): 'float',
pa.float64(): 'double',
pa.time32('s'): '',
pa.time32('ms'): '',
pa.time64('us'): '',
pa.time64('ns'): 'io.deephaven.time.DateTime',
pa.timestamp('s', tz=None): '',
pa.timestamp('ms', tz=None): '',
pa.timestamp('us', tz=None): '',
pa.timestamp('ns', tz=None): '',
pa.date32(): 'java.time.LocalDate',
pa.date64(): 'java.time.LocalDate',
pa.duration('s'): '',
pa.duration('ms'): '',
pa.duration('us'): '',
pa.duration('ns'): '',
pa.month_day_nano_interval(): '',
pa.binary(): '',
pa.string(): 'java.lang.String',
pa.utf8(): 'java.lang.String',
pa.large_binary(): '',
pa.large_string(): '',
pa.large_utf8(): '',
# decimal128(int precision, int scale=0)
# list_(value_type, int list_size=-1)
# large_list(value_type)
# map_(key_type, item_type[, keys_sorted])
# struct(fields)
# dictionary(index_type, value_type, …)
}

SUPPORTED_ARROW_TYPES = [k for k, v in _ARROW_DH_DATA_TYPE_MAPPING.items() if v]


def _map_arrow_type(arrow_type) -> Dict[str, str]:
"""Maps an Arrow type to the corresponding Deephaven column data type."""
arrow_to_dh = {
pa.null(): '',
pa.bool_(): '',
pa.int8(): 'byte',
pa.int16(): 'short',
pa.int32(): 'int',
pa.int64(): 'long',
pa.uint8(): '',
pa.uint16(): 'char',
pa.uint32(): '',
pa.uint64(): '',
pa.float16(): '',
pa.float32(): 'float',
pa.float64(): 'double',
pa.time32('s'): '',
pa.time32('ms'): '',
pa.time64('us'): '',
pa.time64('ns'): 'io.deephaven.time.DateTime',
pa.timestamp('us', tz=None): '',
pa.timestamp('ns', tz=None): '',
pa.date32(): 'java.time.LocalDate',
pa.date64(): 'java.time.LocalDate',
pa.binary(): '',
pa.string(): 'java.lang.String',
pa.utf8(): 'java.lang.String',
pa.large_binary(): '',
pa.large_string(): '',
pa.large_utf8(): '',
# decimal128(int precision, int scale=0)
# list_(value_type, int list_size=-1)
# large_list(value_type)
# map_(key_type, item_type[, keys_sorted])
# struct(fields)
# dictionary(index_type, value_type, …)
# field(name, type, bool nullable = True[, metadata])
# schema(fields[, metadata])
# from_numpy_dtype(dtype)
}

dh_type = arrow_to_dh.get(arrow_type)
dh_type = _ARROW_DH_DATA_TYPE_MAPPING.get(arrow_type)
if not dh_type:
# if this is a case of timestamp with tz specified
if isinstance(arrow_type, pa.TimestampType):
if isinstance(arrow_type, pa.TimestampType) and arrow_type.tz:
dh_type = "io.deephaven.time.DateTime"

if not dh_type:
raise DHError(message=f'unsupported arrow data type : {arrow_type}')
raise DHError(message=f'unsupported arrow data type : {arrow_type}, refer to '
f'deephaven.arrow.SUPPORTED_ARROW_TYPES for the list of supported Arrow types.')

return {"deephaven:type": dh_type}

Expand Down
122 changes: 67 additions & 55 deletions py/server/tests/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
#
import unittest
from datetime import datetime
from typing import List, Any

import numpy as np
import pyarrow as pa
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as papq

from deephaven import arrow as dharrow, dtypes, new_table, DHError, time_table
from deephaven import arrow as dharrow, dtypes, new_table, time_table
from deephaven.column import byte_col, char_col, short_col, int_col, long_col, float_col, double_col, \
string_col, datetime_col
from deephaven.table import Table
Expand Down Expand Up @@ -40,54 +41,76 @@ def setUpClass(cls) -> None:
def tearDownClass(cls) -> None:
del cls.test_table

def test_arrow_types(self):
pa_types = (
pa.int32(),
pa.int64(),
# pa.time32('s'),
# pa.time64('us'),
def verify_type_conversion(self, pa_types: List[pa.DataType], pa_data: List[Any]):
fields = [pa.field(f"f{i}", ty) for i, ty in enumerate(pa_types)]
schema = pa.schema(fields)
pa_table = pa.table(pa_data, schema=schema)
dh_table = dharrow.to_table(pa_table)
arrow_table = dharrow.to_arrow(dh_table)
self.assertEqual(dh_table.size, 2)
self.assertTrue(pa_table.equals(arrow_table))

def test_arrow_types_integers(self):
with self.subTest("signed integers"):
pa_types = [
pa.int8(),
pa.int16(),
pa.int32(),
pa.int64(),
]
pa_data = [
pa.array([2 ** 7 - 1, -2 ** 7 + 1]),
pa.array([2 ** 15 - 1, -2 ** 15 + 1]),
pa.array([2 ** 31 - 1, -2 ** 31 + 1]),
pa.array([2 ** 63 - 1, -2 ** 63 + 1]),
]
self.verify_type_conversion(pa_types=pa_types, pa_data=pa_data)

@unittest.skip("Not correctly widened")
def test_arrow_types_unsigned_integers(self):
with self.subTest("unsigned integers"):
pa_types = [
pa.uint16(),
]
pa_data = [
pa.array([2 ** 16 - 1, 0]),
]

@unittest.skip("Not correctly converted by DH")
def test_arrow_types_time(self):
pa_types = [
pa.time64('ns'),
# pa.date32(),
pa.timestamp('us'),
pa.timestamp('us', tz='Europe/Paris'),
# pa.duration('s'),
# pa.float16(),
pa.date32(),
pa.timestamp('ns', tz='Europe/Paris'),
]

pa_data = [
pa.array([1_000_001, 1_000_002]),
pa.array([datetime(2022, 12, 7), datetime(2022, 12, 30)]),
pa.array([pd.Timestamp('2017-01-01T12:01:01', tz='UTC'),
pd.Timestamp('2017-01-01T11:01:01', tz='Europe/Paris')]),
]
self.verify_type_conversion(pa_types=pa_types, pa_data=pa_data)

def test_arrow_types_floating(self):
pa_types = [
pa.float32(),
pa.float64(),
# pa.decimal128(19, 4),
# pa.decimal256(76, 38),
pa.string(),
# pa.binary(),
# pa.binary(10),
# pa.large_string(),
# pa.large_binary(),
)
pa_data = (
pa.array([1, 2]),
pa.array([2**63 - 1, -2**63]),
# pa.array([datetime(2019, 1, 1, 0), datetime(2020, 1, 1, 1)]),
pa.array([datetime(2019, 1, 1), datetime(2020, 1, 1)]),
# pa.array([datetime(2019, 1, 1, 0), datetime(2020, 1, 1, 1)]),
pa.array([pd.Timestamp('2017-01-01T12'), pd.Timestamp('2017-01-01T11')]),
pa.array([pd.Timestamp('2017-01-01T12', tz='Europe/Paris'), pd.Timestamp('2017-01-01T11', tz='Europe/Paris')]),
# pa.duration('s'),
# pa.array([np.float16(1.1), np.float16(2.2)], pa.float16()),
]
pa_data = [
pa.array([1.1, 2.2], pa.float32()),
pa.array([1.1, 2.2], pa.float64()),
# pa.decimal128(19, 4),
# pa.decimal256(76, 38),
pa.array(["foo", "bar"]),
# pa.binary(),
# pa.binary(10),
# pa.large_string(),
# pa.large_binary(),
)
]
self.verify_type_conversion(pa_types=pa_types, pa_data=pa_data)

fields = [pa.field(f"field_name_{i}", ty) for i, ty in enumerate(pa_types)]
schema = pa.schema(fields)
pa_table = pa.table(pa_data, schema=schema)
dh_table = dharrow.to_table(pa_table)
self.assertEqual(dh_table.size, 2)
def test_arrow_types_text_binary(self):
pa_types = [
pa.string(),
]
pa_data = [
pa.array(["foo", "bar"]),
]
self.verify_type_conversion(pa_types=pa_types, pa_data=pa_data)

def test_against_parquet(self):
arrow_table = papq.read_table("tests/data/crypto_trades.parquet")
Expand Down Expand Up @@ -139,17 +162,6 @@ def test_round_trip_cols(self):
dh_table_1 = dharrow.to_table(pa_table_cols)
self.assert_table_equals(dh_table_1, dh_table)

def test_for_a_potential_bug(self):
arrow_table = papq.read_table("tests/data/crypto_trades.parquet")

with self.assertRaises(DHError) as cm:
dh_table = dharrow.to_table(arrow_table, cols=["t_date"])
ex_msg = r"RuntimeError: java.util.NoSuchElementException"
r"*gnu.trove.list.array.TLongArrayList$TLongArrayIterator.next"
r"*io.deephaven.extensions.barrage.chunk.VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream"
r"*io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator.extractChunkFromInputStream"
self.assertRegex(str(cm.exception), ex_msg)

def test_ticking_table(self):
table = time_table("00:00:00.001").update(["X = i", "Y = String.valueOf(i)"])
self.wait_ticking_table_update(table, row_count=100, timeout=5)
Expand Down

0 comments on commit 11cf4bc

Please sign in to comment.