diff --git a/py/client/README.md b/py/client/README.md index 66cfb0c3e94..3ac7c02f628 100644 --- a/py/client/README.md +++ b/py/client/README.md @@ -205,19 +205,19 @@ session.bind_table(name="my_table", table=table) Deephaven natively supports [PyArrow tables](https://arrow.apache.org/docs/python/index.html). This example converts between a PyArrow table and a Deephaven table. ``` -import pyarrow +import pyarrow as pa from pydeephaven import Session session = Session() -arr = pyarrow.array([4,5,6], type=pyarrow.int32()) -pyarrow_table = pyarrow.Table.from_arrays([arr], names=["Integers"]) +arr = pa.array([4,5,6], type=pa.int32()) +pa_table = pa.Table.from_arrays([arr], names=["Integers"]) -table = session.import_table(pyarrow_table) +table = session.import_table(pa_table) session.bind_table(name="my_table", table=table) #Convert the Deephaven table back to a pyarrow table -pyarrow_table = table.to_arrow() +pa_table = table.to_arrow() ``` ## Execute a script server side diff --git a/py/client/docs/source/examples.rst b/py/client/docs/source/examples.rst index 4cb70f07aa0..a37cc0c8600 100644 --- a/py/client/docs/source/examples.rst +++ b/py/client/docs/source/examples.rst @@ -160,23 +160,23 @@ Convert a PyArrow table to a Deephaven table Deephaven natively supports PyArrow tables. This example converts between a PyArrow table and a Deephaven table: - import pyarrow + import pyarrow as pa from pydeephaven import Session session = Session() - arr = pyarrow.array([4,5,6], type=pyarrow.int32()) + arr = pa.array([4,5,6], type=pa.int32()) - pyarrow_table = pyarrow.Table.from_arrays([arr], names=["Integers"]) + pa_table = pa.Table.from_arrays([arr], names=["Integers"]) - table = session.import_table(pyarrow_table) + table = session.import_table(pa_table) session.bind_table(name="my_table", table=table) #Convert the Deephaven table back to a pyarrow table - pyarrow_table = table.to_arrow() + pa_table = table.to_arrow() Execute a script server side ############################ diff --git a/py/client/pydeephaven/_arrow.py b/py/client/pydeephaven/_arrow.py new file mode 100644 index 00000000000..7e65301d6af --- /dev/null +++ b/py/client/pydeephaven/_arrow.py @@ -0,0 +1,65 @@ +# +# Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending +# +from typing import Dict + +import pyarrow as pa +from .dherror import DHError + +_ARROW_DH_DATA_TYPE_MAPPING = { + pa.null(): '', + pa.bool_(): 'java.lang.Boolean', + pa.int8(): 'byte', + pa.int16(): 'short', + pa.int32(): 'int', + pa.int64(): 'long', + pa.uint8(): '', + pa.uint16(): 'char', + pa.uint32(): '', + pa.uint64(): '', + pa.float16(): '', + pa.float32(): 'float', + pa.float64(): 'double', + pa.time32('s'): '', + pa.time32('ms'): '', + pa.time64('us'): '', + pa.time64('ns'): '', + pa.timestamp('s'): '', + pa.timestamp('ms'): '', + pa.timestamp('us'): '', + pa.timestamp('ns'): 'io.deephaven.time.DateTime', + pa.date32(): '', + pa.date64(): '', + pa.duration('s'): '', + pa.duration('ms'): '', + pa.duration('us'): '', + pa.duration('ns'): '', + pa.month_day_nano_interval(): '', + pa.binary(): '', + pa.string(): 'java.lang.String', + pa.utf8(): 'java.lang.String', + pa.large_binary(): '', + pa.large_string(): '', + pa.large_utf8(): '', + # decimal128(int precision, int scale=0) + # list_(value_type, int list_size=-1) + # large_list(value_type) + # map_(key_type, item_type[, keys_sorted]) + # struct(fields) + # dictionary(index_type, value_type, …) +} + + +def map_arrow_type(arrow_type) -> Dict[str, str]: + """Maps an Arrow type to the corresponding Deephaven column data type.""" + dh_type = _ARROW_DH_DATA_TYPE_MAPPING.get(arrow_type) + if not dh_type: + # if this is a case of timestamp with tz specified + if isinstance(arrow_type, pa.TimestampType): + dh_type = "io.deephaven.time.DateTime" + + if not dh_type: + raise DHError(message=f'unsupported arrow data type : {arrow_type}, refer to ' + f'deephaven.arrow.SUPPORTED_ARROW_TYPES for the list of supported Arrow types.') + + return {"deephaven:type": dh_type} diff --git a/py/client/pydeephaven/_arrow_flight_service.py b/py/client/pydeephaven/_arrow_flight_service.py index a9f68653ac4..66d6f2ef39f 100644 --- a/py/client/pydeephaven/_arrow_flight_service.py +++ b/py/client/pydeephaven/_arrow_flight_service.py @@ -2,71 +2,20 @@ # Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending # -import pyarrow import pyarrow as pa import pyarrow.flight as paflight + +from pydeephaven._arrow import map_arrow_type from pydeephaven.dherror import DHError from pydeephaven.table import Table -def _map_arrow_type(arrow_type): - arrow_to_dh = { - pa.null(): '', - pa.bool_(): '', - pa.int8(): 'byte', - pa.int16(): 'short', - pa.int32(): 'int', - pa.int64(): 'long', - pa.uint8(): '', - pa.uint16(): 'char', - pa.uint32(): '', - pa.uint64(): '', - pa.float16(): '', - pa.float32(): 'float', - pa.float64(): 'double', - pa.time32('s'): '', - pa.time32('ms'): '', - pa.time64('us'): '', - pa.time64('ns'): 'io.deephaven.time.DateTime', - pa.timestamp('us', tz=None): '', - pa.timestamp('ns', tz=None): '', - pa.date32(): 'java.time.LocalDate', - pa.date64(): 'java.time.LocalDate', - pa.binary(): '', - pa.string(): 'java.lang.String', - pa.utf8(): 'java.lang.String', - pa.large_binary(): '', - pa.large_string(): '', - pa.large_utf8(): '', - # decimal128(int precision, int scale=0) - # list_(value_type, int list_size=-1) - # large_list(value_type) - # map_(key_type, item_type[, keys_sorted]) - # struct(fields) - # dictionary(index_type, value_type, …) - # field(name, type, bool nullable = True[, metadata]) - # schema(fields[, metadata]) - # from_numpy_dtype(dtype) - } - - dh_type = arrow_to_dh.get(arrow_type) - if not dh_type: - # if this is a case of timestamp with tz specified - if isinstance(arrow_type, pa.TimestampType): - dh_type = "io.deephaven.time.DateTime" - - if not dh_type: - raise DHError(f'unsupported arrow data type : {arrow_type}') - - return {"deephaven:type": dh_type} - - class ArrowFlightService: def __init__(self, session, flight_client): self.session = session self._flight_client = flight_client - def import_table(self, data: pyarrow.Table): + def import_table(self, data: pa.Table): try: options = paflight.FlightCallOptions(headers=self.session.grpc_metadata) if not isinstance(data, (pa.Table, pa.RecordBatch)): @@ -74,7 +23,7 @@ def import_table(self, data: pyarrow.Table): ticket = self.session.get_ticket() dh_fields = [] for f in data.schema: - dh_fields.append(pa.field(name=f.name, type=f.type, metadata=_map_arrow_type(f.type))) + dh_fields.append(pa.field(name=f.name, type=f.type, metadata=map_arrow_type(f.type))) dh_schema = pa.schema(dh_fields) writer, reader = self._flight_client.do_put( diff --git a/py/client/pydeephaven/_input_table_service.py b/py/client/pydeephaven/_input_table_service.py new file mode 100644 index 00000000000..b4518620aca --- /dev/null +++ b/py/client/pydeephaven/_input_table_service.py @@ -0,0 +1,31 @@ +# +# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending +# +from pydeephaven import Table +from pydeephaven.dherror import DHError +from pydeephaven.proto import inputtable_pb2, inputtable_pb2_grpc +from pydeephaven.table import InputTable + + +class InputTableService: + def __init__(self, session): + self.session = session + self._grpc_input_table_stub = inputtable_pb2_grpc.InputTableServiceStub(session.grpc_channel) + + def add(self, input_table: InputTable, table: Table): + try: + response = self._grpc_input_table_stub.AddTableToInputTable( + inputtable_pb2.AddTableRequest(input_table=input_table.ticket, + table_to_add=table.ticket), + metadata=self.session.grpc_metadata) + except Exception as e: + raise DHError("failed to add to InputTable") from e + + def delete(self, input_table: InputTable, table: Table): + try: + response = self._grpc_input_table_stub.DeleteTableFromInputTable( + inputtable_pb2.DeleteTableRequest(input_table=input_table.ticket, + table_to_remove=table.ticket), + metadata=self.session.grpc_metadata) + except Exception as e: + raise DHError("failed to delete from InputTable") from e diff --git a/py/client/pydeephaven/_table_ops.py b/py/client/pydeephaven/_table_ops.py index 15439348158..4375386c66c 100644 --- a/py/client/pydeephaven/_table_ops.py +++ b/py/client/pydeephaven/_table_ops.py @@ -4,6 +4,9 @@ from abc import ABC, abstractmethod from typing import List, Any +import pyarrow as pa + +from pydeephaven._arrow import map_arrow_type from pydeephaven.agg import Aggregation from pydeephaven.constants import SortDirection, MatchRule from pydeephaven.proto import table_pb2, table_pb2_grpc @@ -557,3 +560,43 @@ def make_grpc_request(self, result_id, source_id=None): def make_grpc_request_for_batch(self, result_id, source_id): return table_pb2.BatchTableRequest.Operation( aggregate_all=self.make_grpc_request(result_id=result_id, source_id=source_id)) + + +class CreateInputTableOp(TableOp): + def __init__(self, schema: pa.schema, init_table: Any, key_cols: List[str] = None): + self.schema = schema + self.init_table = init_table + self.key_cols = key_cols + + @classmethod + def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub): + return table_service_stub.CreateInputTable + + def make_grpc_request(self, result_id, source_id=None): + if self.key_cols: + key_backed = table_pb2.CreateInputTableRequest.InputTableKind.InMemoryKeyBacked( + key_columns=self.key_cols) + input_table_kind = table_pb2.CreateInputTableRequest.InputTableKind(in_memory_key_backed=key_backed) + else: + append_only = table_pb2.CreateInputTableRequest.InputTableKind.InMemoryAppendOnly() + input_table_kind = table_pb2.CreateInputTableRequest.InputTableKind(in_memory_append_only=append_only) + + if self.schema: + dh_fields = [] + for f in self.schema: + dh_fields.append(pa.field(name=f.name, type=f.type, metadata=map_arrow_type(f.type))) + dh_schema = pa.schema(dh_fields) + + schema = dh_schema.serialize().to_pybytes() + return table_pb2.CreateInputTableRequest(result_id=result_id, + schema=schema, + kind=input_table_kind) + else: + source_table_id = table_pb2.TableReference(ticket=self.init_table.ticket) + return table_pb2.CreateInputTableRequest(result_id=result_id, + source_table_id=source_table_id, + kind=input_table_kind) + + def make_grpc_request_for_batch(self, result_id, source_id): + return table_pb2.BatchTableRequest.Operation( + create_input_table=self.make_grpc_request(result_id=result_id, source_id=source_id)) diff --git a/py/client/pydeephaven/_table_service.py b/py/client/pydeephaven/_table_service.py index 332477c8ace..9cb45db797e 100644 --- a/py/client/pydeephaven/_table_service.py +++ b/py/client/pydeephaven/_table_service.py @@ -35,7 +35,7 @@ def batch(self, ops): except Exception as e: raise DHError("failed to finish the table batch operation.") from e - def grpc_table_op(self, table: Table, op: TableOp): + def grpc_table_op(self, table: Table, op: TableOp, table_class: type = Table): try: result_id = self.session.make_ticket() if table: @@ -47,7 +47,7 @@ def grpc_table_op(self, table: Table, op: TableOp): metadata=self.session.grpc_metadata) if response.success: - return Table(self.session, ticket=response.result_id.ticket, + return table_class(self.session, ticket=response.result_id.ticket, schema_header=response.schema_header, size=response.size, is_static=response.is_static) diff --git a/py/client/pydeephaven/session.py b/py/client/pydeephaven/session.py index 5cfd816f2df..96ac7e85d3c 100644 --- a/py/client/pydeephaven/session.py +++ b/py/client/pydeephaven/session.py @@ -7,23 +7,23 @@ from typing import List import grpc -import pyarrow +import pyarrow as pa import pyarrow.flight as paflight from bitstring import BitArray -from pyarrow import ArrowNotImplementedError from pyarrow._flight import ClientMiddlewareFactory, ClientMiddleware, ClientAuthHandler from pydeephaven._app_service import AppService from pydeephaven._arrow_flight_service import ArrowFlightService from pydeephaven._config_service import ConfigService from pydeephaven._console_service import ConsoleService +from pydeephaven._input_table_service import InputTableService from pydeephaven._session_service import SessionService -from pydeephaven._table_ops import TimeTableOp, EmptyTableOp, MergeTablesOp, FetchTableOp +from pydeephaven._table_ops import TimeTableOp, EmptyTableOp, MergeTablesOp, FetchTableOp, CreateInputTableOp from pydeephaven._table_service import TableService from pydeephaven.dherror import DHError from pydeephaven.proto import ticket_pb2 from pydeephaven.query import Query -from pydeephaven.table import Table +from pydeephaven.table import Table, InputTable class _DhClientAuthMiddlewareFactory(ClientMiddlewareFactory): @@ -131,6 +131,7 @@ def __init__(self, host: str = None, port: int = None, auth_type: str = "Anonymo self._console_service = None self._flight_service = None self._app_service = None + self._input_table_service = None self._never_timeout = never_timeout self._keep_alive_timer = None self._session_type = session_type @@ -160,32 +161,32 @@ def grpc_metadata(self): return [(b'authorization', self._auth_token)] @property - def table_service(self): + def table_service(self) -> TableService: if not self._table_service: self._table_service = TableService(self) return self._table_service @property - def session_service(self): + def session_service(self) -> SessionService: if not self._session_service: self._session_service = SessionService(self) return self._session_service @property - def console_service(self): + def console_service(self) -> ConsoleService: if not self._console_service: self._console_service = ConsoleService(self) return self._console_service @property - def flight_service(self): + def flight_service(self) -> ArrowFlightService: if not self._flight_service: self._flight_service = ArrowFlightService(self, self._flight_client) return self._flight_service @property - def app_service(self): + def app_service(self) -> AppService: if not self._app_service: self._app_service = AppService(self) @@ -198,6 +199,13 @@ def config_service(self): return self._config_service + @property + def input_table_service(self) -> InputTableService: + if not self._input_table_service: + self._input_table_service = InputTableService(self) + + return self._input_table_service + def make_ticket(self, ticket_no=None): if not ticket_no: ticket_no = self.get_ticket() @@ -371,7 +379,7 @@ def time_table(self, period: int, start_time: int = None) -> Table: return self.table_service.grpc_table_op(None, table_op) def empty_table(self, size: int) -> Table: - """ create an empty table on the server. + """ Create an empty table on the server. Args: size (int): the size of the empty table in number of rows @@ -385,14 +393,14 @@ def empty_table(self, size: int) -> Table: table_op = EmptyTableOp(size=size) return self.table_service.grpc_table_op(None, table_op) - def import_table(self, data: pyarrow.Table) -> Table: + def import_table(self, data: pa.Table) -> Table: """ Import the pyarrow table as a new Deephaven table on the server. Deephaven supports most of the Arrow data types. However, if the pyarrow table contains any field with a data type not supported by Deephaven, the import operation will fail. Args: - data (pyarrow.Table): a pyarrow Table object + data (pa.Table): a pyarrow Table object Returns: a Table object @@ -431,3 +439,29 @@ def query(self, table: Table) -> Query: DHError """ return Query(self, table) + + def input_table(self, schema: pa.Schema = None, init_table: Table = None, + key_cols: List[str] = None) -> InputTable: + """ Create an InputTable from either Arrow schema or initial table. When key columns are + provided, the InputTable will be keyed, otherwise it will be append-only. + + Args: + schema (pa.Schema): the schema for the InputTable + init_table (Table): the initial table + key_cols (Union[str, Sequence[str]): the name(s) of the key column(s) + + Returns: + an InputTable + + Raises: + DHError, ValueError + """ + if schema is None and init_table is None: + raise ValueError("either arrow schema or init table should be provided.") + elif schema and init_table: + raise ValueError("both arrow schema and init table are provided.") + + table_op = CreateInputTableOp(schema=schema, init_table=init_table, key_cols=key_cols) + input_table = self.table_service.grpc_table_op(None, table_op, table_class=InputTable) + input_table.key_cols = key_cols + return input_table diff --git a/py/client/pydeephaven/table.py b/py/client/pydeephaven/table.py index b0bb3b0705e..5b4077796ad 100644 --- a/py/client/pydeephaven/table.py +++ b/py/client/pydeephaven/table.py @@ -4,7 +4,9 @@ from __future__ import annotations -import pyarrow +from typing import List + +import pyarrow as pa from pydeephaven.dherror import DHError from pydeephaven._table_interface import TableInterface @@ -67,10 +69,10 @@ def _parse_schema(self, schema_header): if not schema_header: return - reader = pyarrow.ipc.open_stream(schema_header) + reader = pa.ipc.open_stream(schema_header) self.schema = reader.schema - def to_arrow(self) -> pyarrow.Table: + def to_arrow(self) -> pa.Table: """ Take a snapshot of the table and return a pyarrow Table. Returns: @@ -80,3 +82,51 @@ def to_arrow(self) -> pyarrow.Table: DHError """ return self.session.flight_service.do_get_table(self) + + +class InputTable(Table): + """InputTable is a subclass of Table that allows the users to dynamically add/delete/modify data in it. There are two + types of InputTable - append-only and keyed. + + The append-only input table is not keyed, all rows are added to the end of the table, and deletions and edits are + not permitted. + + The keyed input tablet has keys for each row and supports addition/deletion/modification of rows by the keys. + """ + + def __init__(self, session, ticket, schema_header=b'', size=None, is_static=None, schema=None): + super().__init__(session=session, ticket=ticket, schema_header=schema_header, size=size, + is_static=is_static, schema=schema) + self.key_cols: List[str] = None + + def add(self, table: Table) -> None: + """Write rows from the provided table to this input table. If this is a keyed input table, added rows with keys + that match existing rows will replace those rows. + + Args: + table (Table): the table that provides the rows to write + + Raises: + DHError + """ + try: + self.session.input_table_service.add(self, table) + except Exception as e: + raise DHError("add to InputTable failed.") from e + + def delete(self, table: Table) -> None: + """Delete the keys contained in the provided table from this keyed input table. If this method is called on an + append-only input table, a PermissionError will be raised. + + Args: + table (Table): the table with the keys to delete + + Raises: + DHError, PermissionError + """ + if not self.key_cols: + raise PermissionError("deletion on an append-only input table is not allowed.") + try: + self.session.input_table_service.delete(self, table) + except Exception as e: + raise DHError("delete data in the InputTable failed.") from e diff --git a/py/client/pydeephaven/utils.py b/py/client/pydeephaven/utils.py index cf433ea7635..e4b6097de52 100644 --- a/py/client/pydeephaven/utils.py +++ b/py/client/pydeephaven/utils.py @@ -2,16 +2,16 @@ # Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending # -import pyarrow +import pyarrow as pa from .dherror import DHError -from ._arrow_flight_service import _map_arrow_type +from ._arrow import map_arrow_type -def is_deephaven_compatible(data_type: pyarrow.DataType) -> bool: +def is_deephaven_compatible(data_type: pa.DataType) -> bool: """ check if the arrow data type is supported by Deephaven. """ try: - dh_type = _map_arrow_type(data_type) + dh_type = map_arrow_type(data_type) return True except DHError: return False diff --git a/py/client/tests/test_session.py b/py/client/tests/test_session.py index 857276e862b..12829057fcd 100644 --- a/py/client/tests/test_session.py +++ b/py/client/tests/test_session.py @@ -6,6 +6,7 @@ from time import sleep import pyarrow as pa +import pandas as pd from pyarrow import csv from pydeephaven import DHError @@ -186,6 +187,68 @@ def test_import_table_dates(self): self.assertEqual(0, len(exception_list)) + def test_input_table(self): + pa_types = [ + pa.bool_(), + pa.int8(), + pa.int16(), + pa.int32(), + pa.int64(), + # Skip due to https://github.com/deephaven/deephaven-core/issues/3605 + # pa.timestamp('ns'), + # pa.timestamp('ns', tz='MST'), + pa.float32(), + pa.float64(), + pa.string(), + ] + pa_data = [ + pa.array([True, False]), + pa.array([2 ** 7 - 1, -2 ** 7 + 1]), + pa.array([2 ** 15 - 1, -2 ** 15 + 1]), + pa.array([2 ** 31 - 1, -2 ** 31 + 1]), + pa.array([2 ** 63 - 1, -2 ** 63 + 1]), + # pa.array([pd.Timestamp('2017-01-01T12:01:01', tz='UTC'), + # pd.Timestamp('2017-01-01T11:01:01', tz='Europe/Paris')]), + # pa.array([pd.Timestamp('2017-01-01T2:01:01', tz='UTC'), + # pd.Timestamp('2017-01-01T1:01:01', tz='Europe/Paris')]), + pa.array([1.1, 2.2], pa.float32()), + pa.array([1.1, 2.2], pa.float64()), + pa.array(["foo", "bar"]), + ] + fields = [pa.field(f"f{i}", ty) for i, ty in enumerate(pa_types)] + schema = pa.schema(fields) + pa_table = pa.table(pa_data, schema=schema) + dh_table = self.session.import_table(pa_table) + + with self.subTest("Create Input Table"): + keyed_input_t = self.session.input_table(schema=schema, key_cols=["f1"]) + pa_table = keyed_input_t.to_arrow() + self.assertEqual(schema, pa_table.schema) + + append_input_t = self.session.input_table(init_table=keyed_input_t) + pa_table = append_input_t.to_arrow() + self.assertEqual(schema, pa_table.schema) + + with self.assertRaises(ValueError): + self.session.input_table(schema=schema, init_table=append_input_t) + with self.assertRaises(ValueError): + self.session.input_table(key_cols=["f0"]) + + with self.subTest("InputTable ops"): + keyed_input_t.add(dh_table) + self.assertEqual(keyed_input_t.snapshot().size, dh_table.size) + keyed_input_t.add(dh_table) + self.assertEqual(keyed_input_t.snapshot().size, dh_table.size) + keyed_input_t.delete(dh_table.select(["f1"])) + self.assertEqual(keyed_input_t.snapshot().size, 0) + + append_input_t.add(dh_table) + self.assertEqual(append_input_t.snapshot().size, dh_table.size) + append_input_t.add(dh_table) + self.assertEqual(append_input_t.snapshot().size, dh_table.size * 2) + with self.assertRaises(PermissionError): + append_input_t.delete(dh_table) + if __name__ == '__main__': unittest.main()