Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add InputTable in the Python client API #3612

Merged
merged 1 commit into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions py/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,19 +205,19 @@ session.bind_table(name="my_table", table=table)
Deephaven natively supports [PyArrow tables](https://arrow.apache.org/docs/python/index.html). This example converts between a PyArrow table and a Deephaven table.

```
import pyarrow
import pyarrow as pa
from pydeephaven import Session

session = Session()

arr = pyarrow.array([4,5,6], type=pyarrow.int32())
pyarrow_table = pyarrow.Table.from_arrays([arr], names=["Integers"])
arr = pa.array([4,5,6], type=pa.int32())
pa_table = pa.Table.from_arrays([arr], names=["Integers"])

table = session.import_table(pyarrow_table)
table = session.import_table(pa_table)
session.bind_table(name="my_table", table=table)

#Convert the Deephaven table back to a pyarrow table
pyarrow_table = table.to_arrow()
pa_table = table.to_arrow()
```

## Execute a script server side
Expand Down
10 changes: 5 additions & 5 deletions py/client/docs/source/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -160,23 +160,23 @@ Convert a PyArrow table to a Deephaven table

Deephaven natively supports PyArrow tables. This example converts between a PyArrow table and a Deephaven table:

import pyarrow
import pyarrow as pa

from pydeephaven import Session

session = Session()

arr = pyarrow.array([4,5,6], type=pyarrow.int32())
arr = pa.array([4,5,6], type=pa.int32())

pyarrow_table = pyarrow.Table.from_arrays([arr], names=["Integers"])
pa_table = pa.Table.from_arrays([arr], names=["Integers"])

table = session.import_table(pyarrow_table)
table = session.import_table(pa_table)

session.bind_table(name="my_table", table=table)

#Convert the Deephaven table back to a pyarrow table

pyarrow_table = table.to_arrow()
pa_table = table.to_arrow()

Execute a script server side
############################
Expand Down
65 changes: 65 additions & 0 deletions py/client/pydeephaven/_arrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#
# Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
#
from typing import Dict

import pyarrow as pa
from .dherror import DHError

_ARROW_DH_DATA_TYPE_MAPPING = {
pa.null(): '',
pa.bool_(): 'java.lang.Boolean',
pa.int8(): 'byte',
pa.int16(): 'short',
pa.int32(): 'int',
pa.int64(): 'long',
pa.uint8(): '',
pa.uint16(): 'char',
pa.uint32(): '',
pa.uint64(): '',
pa.float16(): '',
pa.float32(): 'float',
pa.float64(): 'double',
pa.time32('s'): '',
pa.time32('ms'): '',
pa.time64('us'): '',
pa.time64('ns'): '',
pa.timestamp('s'): '',
pa.timestamp('ms'): '',
pa.timestamp('us'): '',
pa.timestamp('ns'): 'io.deephaven.time.DateTime',
pa.date32(): '',
pa.date64(): '',
pa.duration('s'): '',
pa.duration('ms'): '',
pa.duration('us'): '',
pa.duration('ns'): '',
pa.month_day_nano_interval(): '',
pa.binary(): '',
pa.string(): 'java.lang.String',
pa.utf8(): 'java.lang.String',
pa.large_binary(): '',
pa.large_string(): '',
pa.large_utf8(): '',
# decimal128(int precision, int scale=0)
# list_(value_type, int list_size=-1)
# large_list(value_type)
# map_(key_type, item_type[, keys_sorted])
# struct(fields)
# dictionary(index_type, value_type, …)
}


def map_arrow_type(arrow_type) -> Dict[str, str]:
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
"""Maps an Arrow type to the corresponding Deephaven column data type."""
dh_type = _ARROW_DH_DATA_TYPE_MAPPING.get(arrow_type)
if not dh_type:
# if this is a case of timestamp with tz specified
if isinstance(arrow_type, pa.TimestampType):
dh_type = "io.deephaven.time.DateTime"

if not dh_type:
raise DHError(message=f'unsupported arrow data type : {arrow_type}, refer to '
f'deephaven.arrow.SUPPORTED_ARROW_TYPES for the list of supported Arrow types.')

return {"deephaven:type": dh_type}
59 changes: 4 additions & 55 deletions py/client/pydeephaven/_arrow_flight_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,79 +2,28 @@
# Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
#

import pyarrow
import pyarrow as pa
import pyarrow.flight as paflight

from pydeephaven._arrow import map_arrow_type
from pydeephaven.dherror import DHError
from pydeephaven.table import Table


def _map_arrow_type(arrow_type):
arrow_to_dh = {
pa.null(): '',
pa.bool_(): '',
pa.int8(): 'byte',
pa.int16(): 'short',
pa.int32(): 'int',
pa.int64(): 'long',
pa.uint8(): '',
pa.uint16(): 'char',
pa.uint32(): '',
pa.uint64(): '',
pa.float16(): '',
pa.float32(): 'float',
pa.float64(): 'double',
pa.time32('s'): '',
pa.time32('ms'): '',
pa.time64('us'): '',
pa.time64('ns'): 'io.deephaven.time.DateTime',
pa.timestamp('us', tz=None): '',
pa.timestamp('ns', tz=None): '',
pa.date32(): 'java.time.LocalDate',
pa.date64(): 'java.time.LocalDate',
pa.binary(): '',
pa.string(): 'java.lang.String',
pa.utf8(): 'java.lang.String',
pa.large_binary(): '',
pa.large_string(): '',
pa.large_utf8(): '',
# decimal128(int precision, int scale=0)
# list_(value_type, int list_size=-1)
# large_list(value_type)
# map_(key_type, item_type[, keys_sorted])
# struct(fields)
# dictionary(index_type, value_type, …)
# field(name, type, bool nullable = True[, metadata])
# schema(fields[, metadata])
# from_numpy_dtype(dtype)
}

dh_type = arrow_to_dh.get(arrow_type)
if not dh_type:
# if this is a case of timestamp with tz specified
if isinstance(arrow_type, pa.TimestampType):
dh_type = "io.deephaven.time.DateTime"

if not dh_type:
raise DHError(f'unsupported arrow data type : {arrow_type}')

return {"deephaven:type": dh_type}


class ArrowFlightService:
def __init__(self, session, flight_client):
self.session = session
self._flight_client = flight_client

def import_table(self, data: pyarrow.Table):
def import_table(self, data: pa.Table):
try:
options = paflight.FlightCallOptions(headers=self.session.grpc_metadata)
if not isinstance(data, (pa.Table, pa.RecordBatch)):
raise DHError("source data must be either a pa table or RecordBatch.")
ticket = self.session.get_ticket()
dh_fields = []
for f in data.schema:
dh_fields.append(pa.field(name=f.name, type=f.type, metadata=_map_arrow_type(f.type)))
dh_fields.append(pa.field(name=f.name, type=f.type, metadata=map_arrow_type(f.type)))
dh_schema = pa.schema(dh_fields)

writer, reader = self._flight_client.do_put(
Expand Down
31 changes: 31 additions & 0 deletions py/client/pydeephaven/_input_table_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#
from pydeephaven import Table
from pydeephaven.dherror import DHError
from pydeephaven.proto import inputtable_pb2, inputtable_pb2_grpc
from pydeephaven.table import InputTable


class InputTableService:
def __init__(self, session):
self.session = session
self._grpc_input_table_stub = inputtable_pb2_grpc.InputTableServiceStub(session.grpc_channel)

def add(self, input_table: InputTable, table: Table):
try:
response = self._grpc_input_table_stub.AddTableToInputTable(
inputtable_pb2.AddTableRequest(input_table=input_table.ticket,
table_to_add=table.ticket),
metadata=self.session.grpc_metadata)
except Exception as e:
raise DHError("failed to add to InputTable") from e

def delete(self, input_table: InputTable, table: Table):
try:
response = self._grpc_input_table_stub.DeleteTableFromInputTable(
inputtable_pb2.DeleteTableRequest(input_table=input_table.ticket,
table_to_remove=table.ticket),
metadata=self.session.grpc_metadata)
except Exception as e:
raise DHError("failed to delete from InputTable") from e
43 changes: 43 additions & 0 deletions py/client/pydeephaven/_table_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from abc import ABC, abstractmethod
from typing import List, Any

import pyarrow as pa

from pydeephaven._arrow import map_arrow_type
from pydeephaven.agg import Aggregation
from pydeephaven.constants import SortDirection, MatchRule
from pydeephaven.proto import table_pb2, table_pb2_grpc
Expand Down Expand Up @@ -557,3 +560,43 @@ def make_grpc_request(self, result_id, source_id=None):
def make_grpc_request_for_batch(self, result_id, source_id):
return table_pb2.BatchTableRequest.Operation(
aggregate_all=self.make_grpc_request(result_id=result_id, source_id=source_id))


class CreateInputTableOp(TableOp):
def __init__(self, schema: pa.schema, init_table: Any, key_cols: List[str] = None):
self.schema = schema
self.init_table = init_table
self.key_cols = key_cols

@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub):
return table_service_stub.CreateInputTable

def make_grpc_request(self, result_id, source_id=None):
if self.key_cols:
key_backed = table_pb2.CreateInputTableRequest.InputTableKind.InMemoryKeyBacked(
key_columns=self.key_cols)
input_table_kind = table_pb2.CreateInputTableRequest.InputTableKind(in_memory_key_backed=key_backed)
else:
append_only = table_pb2.CreateInputTableRequest.InputTableKind.InMemoryAppendOnly()
input_table_kind = table_pb2.CreateInputTableRequest.InputTableKind(in_memory_append_only=append_only)

if self.schema:
dh_fields = []
for f in self.schema:
dh_fields.append(pa.field(name=f.name, type=f.type, metadata=map_arrow_type(f.type)))
dh_schema = pa.schema(dh_fields)

schema = dh_schema.serialize().to_pybytes()
return table_pb2.CreateInputTableRequest(result_id=result_id,
schema=schema,
kind=input_table_kind)
else:
source_table_id = table_pb2.TableReference(ticket=self.init_table.ticket)
return table_pb2.CreateInputTableRequest(result_id=result_id,
source_table_id=source_table_id,
kind=input_table_kind)

def make_grpc_request_for_batch(self, result_id, source_id):
return table_pb2.BatchTableRequest.Operation(
create_input_table=self.make_grpc_request(result_id=result_id, source_id=source_id))
4 changes: 2 additions & 2 deletions py/client/pydeephaven/_table_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def batch(self, ops):
except Exception as e:
raise DHError("failed to finish the table batch operation.") from e

def grpc_table_op(self, table: Table, op: TableOp):
def grpc_table_op(self, table: Table, op: TableOp, table_class: type = Table):
try:
result_id = self.session.make_ticket()
if table:
Expand All @@ -47,7 +47,7 @@ def grpc_table_op(self, table: Table, op: TableOp):
metadata=self.session.grpc_metadata)

if response.success:
return Table(self.session, ticket=response.result_id.ticket,
return table_class(self.session, ticket=response.result_id.ticket,
schema_header=response.schema_header,
size=response.size,
is_static=response.is_static)
Expand Down
Loading