diff --git a/pyintegration/deephaven2/__init__.py b/pyintegration/deephaven2/__init__.py index 5219de1cf36..87476f2e193 100644 --- a/pyintegration/deephaven2/__init__.py +++ b/pyintegration/deephaven2/__init__.py @@ -17,6 +17,7 @@ from .dherror import DHError from .constants import SortDirection from .csv import read as read_csv +from .csv import write as write_csv from .table import empty_table, time_table -__all__ = ["read_csv", "DHError", "time_table", "empty_table", "SortDirection"] +__all__ = ["read_csv", "write_csv", "DHError", "time_table", "empty_table", "SortDirection"] diff --git a/pyintegration/deephaven2/column.py b/pyintegration/deephaven2/column.py index 58f0d4d419f..8668016ba3b 100644 --- a/pyintegration/deephaven2/column.py +++ b/pyintegration/deephaven2/column.py @@ -4,7 +4,7 @@ from dataclasses import dataclass -@dataclass +@dataclass(frozen=True) class Column: """ A Column object represents a column in a Deephaven Table. """ name: str diff --git a/pyintegration/deephaven2/csv.py b/pyintegration/deephaven2/csv.py index 3d55fb2241f..fd1fe8a70b6 100644 --- a/pyintegration/deephaven2/csv.py +++ b/pyintegration/deephaven2/csv.py @@ -5,7 +5,7 @@ Deephaven table out as a CSV file. """ from enum import Enum -from typing import Dict, Any +from typing import Dict, Any, List import jpy @@ -18,6 +18,7 @@ _JInferenceSpecs = jpy.get_type("io.deephaven.csv.InferenceSpecs") _JTableHeader = jpy.get_type("io.deephaven.qst.table.TableHeader") _JCharset = jpy.get_type("java.nio.charset.Charset") +_JCsvTools = jpy.get_type("io.deephaven.csv.CsvTools") class Inference(Enum): @@ -112,4 +113,21 @@ def read(path: str, return Table(j_table=j_table) except Exception as e: - raise DHError(e, "read_csv failed") from e + raise DHError(e, "read csv failed") from e + + +def write(table: Table, path: str, cols: List[str] = []) -> None: + """ Write a table to a standard CSV file. + + Args: + table (Table): the source table + path (str): the path of the CSV file + cols (List[str]): the names of the columns to be written out + + Raises: + DHError + """ + try: + _JCsvTools.writeCsv(table.j_table, False, path, *cols) + except Exception as e: + raise DHError("write csv failed.") from e diff --git a/pyintegration/deephaven2/table.py b/pyintegration/deephaven2/table.py index 7195d70f64b..57cd16a3f0c 100644 --- a/pyintegration/deephaven2/table.py +++ b/pyintegration/deephaven2/table.py @@ -78,8 +78,8 @@ class Table: """ def __init__(self, j_table): - self._j_table = j_table - self._definition = self._j_table.getDefinition() + self.j_table = j_table + self._definition = self.j_table.getDefinition() self._schema = None def __repr__(self): @@ -91,17 +91,17 @@ def __repr__(self): # to make the table visible to DH script session, internal use only def get_dh_table(self): - return self._j_table + return self.j_table @property def size(self) -> int: """ The current number of rows in the table. """ - return self._j_table.size() + return self.j_table.size() @property def is_refreshing(self) -> bool: """ Whether this table is refreshing. """ - return self._j_table.isRefreshing() + return self.j_table.isRefreshing() @property def columns(self): @@ -135,14 +135,14 @@ def to_string(self, num_rows: int = 10, cols: List[str] = []) -> str: DHError """ try: - return _JTableTools.string(self._j_table, num_rows, *cols) + return _JTableTools.string(self.j_table, num_rows, *cols) except Exception as e: raise DHError(e, "table to_string failed") from e # def snapshot(self): # """ Take a snapshot of the table. """ # try: - # return empty_table(0).snapshot(self._j_table) + # return empty_table(0).snapshot(self.j_table) # except Exception as e: # raise DHError("") from e # @@ -165,7 +165,7 @@ def drop_columns(self, cols: List[str]) -> Table: DHError """ try: - return Table(j_table=self._j_table.dropColumns(*cols)) + return Table(j_table=self.j_table.dropColumns(*cols)) except Exception as e: raise DHError(e, "table drop_columns operation failed.") from e @@ -183,7 +183,7 @@ def move_columns(self, idx: int, cols: List[str]) -> Table: DHError """ try: - return Table(j_table=self._j_table.moveColumns(idx, *cols)) + return Table(j_table=self.j_table.moveColumns(idx, *cols)) except Exception as e: raise DHError(e, "table move_columns operation failed.") from e @@ -201,7 +201,7 @@ def move_columns_down(self, cols: List[str]) -> Table: DHError """ try: - return Table(j_table=self._j_table.moveColumnsDown(*cols)) + return Table(j_table=self.j_table.moveColumnsDown(*cols)) except Exception as e: raise DHError(e, "table move_columns_down operation failed.") from e @@ -219,7 +219,7 @@ def move_columns_up(self, cols: List[str]) -> Table: DHError """ try: - return Table(j_table=self._j_table.moveColumnsUp(*cols)) + return Table(j_table=self.j_table.moveColumnsUp(*cols)) except Exception as e: raise DHError(e, "table move_columns_up operation failed.") from e @@ -236,7 +236,7 @@ def rename_columns(self, cols: List[str]) -> Table: DHError """ try: - return Table(j_table=self._j_table.renameColumns(*cols)) + return Table(j_table=self.j_table.renameColumns(*cols)) except Exception as e: raise DHError(e, "table rename_columns operation failed.") from e @@ -253,7 +253,7 @@ def update(self, formulas: List[str]) -> Table: DHError """ try: - return Table(j_table=self._j_table.update(*formulas)) + return Table(j_table=self.j_table.update(*formulas)) except Exception as e: raise DHError(e, "table update operation failed.") from e @@ -270,7 +270,7 @@ def lazy_update(self, formulas: List[str]) -> Table: DHError """ try: - return Table(j_table=self._j_table.lazyUpdate(*formulas)) + return Table(j_table=self.j_table.lazyUpdate(*formulas)) except Exception as e: raise DHError(e, "table lazy_update operation failed.") from e @@ -287,7 +287,7 @@ def view(self, formulas: List[str]) -> Table: DHError """ try: - return Table(j_table=self._j_table.view(*formulas)) + return Table(j_table=self.j_table.view(*formulas)) except Exception as e: raise DHError(e, "table view operation failed.") from e @@ -304,7 +304,7 @@ def update_view(self, formulas: List[str]) -> Table: DHError """ try: - return Table(j_table=self._j_table.updateView(*formulas)) + return Table(j_table=self.j_table.updateView(*formulas)) except Exception as e: raise DHError(e, "table update_view operation failed.") from e @@ -323,8 +323,8 @@ def select(self, formulas: List[str] = []) -> Table: """ try: if not formulas: - return Table(j_table=self._j_table.select()) - return Table(j_table=self._j_table.select(*formulas)) + return Table(j_table=self.j_table.select()) + return Table(j_table=self.j_table.select(*formulas)) except Exception as e: raise DHError(e, "table select operation failed.") from e @@ -343,7 +343,7 @@ def select_distinct(self, cols: List[str] = []) -> Table: DHError """ try: - return Table(j_table=self._j_table.selectDistinct(*cols)) + return Table(j_table=self.j_table.selectDistinct(*cols)) except Exception as e: raise DHError(e, "table select_distinct operation failed.") from e @@ -367,7 +367,7 @@ def where(self, filters: List[str] = []) -> Table: DHError """ try: - return Table(j_table=self._j_table.where(*filters)) + return Table(j_table=self.j_table.where(*filters)) except Exception as e: raise DHError(e, "table where operation failed.") from e @@ -386,7 +386,7 @@ def where_in(self, filter_table: Table, cols: List[str]) -> Table: DHError """ try: - return Table(j_table=self._j_table.whereIn(filter_table._j_table, *cols)) + return Table(j_table=self.j_table.whereIn(filter_table.j_table, *cols)) except Exception as e: raise DHError(e, "table where_in operation failed.") from e @@ -405,7 +405,7 @@ def where_not_in(self, filter_table: Table, cols: List[str]) -> Table: DHError """ try: - return Table(j_table=self._j_table.whereNotIn(filter_table._j_table, *cols)) + return Table(j_table=self.j_table.whereNotIn(filter_table.j_table, *cols)) except Exception as e: raise DHError(e, "table where_not_in operation failed.") from e @@ -423,7 +423,7 @@ def where_one_of(self, filters: List[str] = []) -> Table: DHError """ try: - return Table(j_table=self._j_table.where(_JFilterOr.of(_JFilter.from_(*filters)))) + return Table(j_table=self.j_table.where(_JFilterOr.of(_JFilter.from_(*filters)))) except Exception as e: raise DHError(e, "table where_one_of operation failed.") from e @@ -440,7 +440,7 @@ def head(self, num_rows: int) -> Table: DHError """ try: - return Table(j_table=self._j_table.head(num_rows)) + return Table(j_table=self.j_table.head(num_rows)) except Exception as e: raise DHError(e, "table head operation failed.") from e @@ -457,7 +457,7 @@ def head_pct(self, pct: float) -> Table: DHError """ try: - return Table(j_table=self._j_table.headPct(pct)) + return Table(j_table=self.j_table.headPct(pct)) except Exception as e: raise DHError(e, "table head_pct operation failed.") from e @@ -474,7 +474,7 @@ def tail(self, num_rows: int) -> Table: DHError """ try: - return Table(j_table=self._j_table.tail(num_rows)) + return Table(j_table=self.j_table.tail(num_rows)) except Exception as e: raise DHError(e, "table tail operation failed.") from e @@ -491,7 +491,7 @@ def tail_pct(self, pct: float) -> Table: DHError """ try: - return Table(j_table=self._j_table.tailPct(pct)) + return Table(j_table=self.j_table.tailPct(pct)) except Exception as e: raise DHError(e, "table tail_pct operation failed.") from e @@ -512,7 +512,7 @@ def restrict_sort_to(self, cols: List[str]): DHError """ try: - return self._j_table.restrictSortTo(*cols) + return self.j_table.restrictSortTo(*cols) except Exception as e: raise DHError(e, "table restrict_sort_to operation failed.") from e @@ -530,7 +530,7 @@ def sort_descending(self, order_by: List[str] = []) -> Table: DHError """ try: - return Table(j_table=self._j_table.sortDescending(*order_by)) + return Table(j_table=self.j_table.sortDescending(*order_by)) except Exception as e: raise DHError(e, "table sort_descending operation failed.") from e @@ -544,7 +544,7 @@ def reverse(self) -> Table: DHError """ try: - return Table(j_table=self._j_table.reverse()) + return Table(j_table=self.j_table.reverse()) except Exception as e: raise DHError(e, "table reverse operation failed.") from e @@ -570,9 +570,9 @@ def sort_column(col, dir_): try: if order: sort_columns = [sort_column(col, dir_) for col, dir_ in zip(order_by, order)] - return Table(j_table=_JTWD.sort(self._j_table, *sort_columns)) + return Table(j_table=_JTWD.sort(self.j_table, *sort_columns)) else: - return Table(j_table=self._j_table.sort(*order_by)) + return Table(j_table=self.j_table.sort(*order_by)) except Exception as e: raise DHError(e, "table sort operation failed.") from e @@ -603,9 +603,9 @@ def natural_join(self, table: Table, on: List[str], joins: List[str] = []) -> Ta """ try: if joins: - return Table(j_table=self._j_table.naturalJoin(table._j_table, ",".join(on), ",".join(joins))) + return Table(j_table=self.j_table.naturalJoin(table.j_table, ",".join(on), ",".join(joins))) else: - return Table(j_table=self._j_table.naturalJoin(table._j_table, ",".join(on))) + return Table(j_table=self.j_table.naturalJoin(table.j_table, ",".join(on))) except Exception as e: raise DHError(e, "table natural_join operation failed.") from e @@ -630,9 +630,9 @@ def exact_join(self, table: Table, on: List[str], joins: List[str] = []) -> Tabl """ try: if joins: - return Table(j_table=self._j_table.exactJoin(table._j_table, ",".join(on), ",".join(joins))) + return Table(j_table=self.j_table.exactJoin(table.j_table, ",".join(on), ",".join(joins))) else: - return Table(j_table=self._j_table.exactJoin(table._j_table, ",".join(on))) + return Table(j_table=self.j_table.exactJoin(table.j_table, ",".join(on))) except Exception as e: raise DHError(e, "table exact_join operation failed.") from e @@ -657,9 +657,9 @@ def left_join(self, table: Table, on: List[str], joins: List[str] = []) -> Table """ try: if joins: - return Table(j_table=self._j_table.leftJoin(table._j_table, ",".join(on), ",".join(joins))) + return Table(j_table=self.j_table.leftJoin(table.j_table, ",".join(on), ",".join(joins))) else: - return Table(j_table=self._j_table.leftJoin(table._j_table, ",".join(on))) + return Table(j_table=self.j_table.leftJoin(table.j_table, ",".join(on))) except Exception as e: raise DHError(e, "table left_join operation failed.") from e @@ -684,9 +684,9 @@ def join(self, table: Table, on: List[str], joins: List[str] = []) -> Table: """ try: if joins: - return Table(j_table=self._j_table.join(table._j_table, ",".join(on), ",".join(joins))) + return Table(j_table=self.j_table.join(table.j_table, ",".join(on), ",".join(joins))) else: - return Table(j_table=self._j_table.join(table._j_table, ",".join(on))) + return Table(j_table=self.j_table.join(table.j_table, ",".join(on))) except Exception as e: raise DHError(e, "table join operation failed.") from e @@ -712,9 +712,9 @@ def aj(self, table: Table, on: List[str], joins: List[str] = []) -> Table: """ try: if joins: - return Table(j_table=self._j_table.aj(table._j_table, ",".join(on), ",".join(joins))) + return Table(j_table=self.j_table.aj(table.j_table, ",".join(on), ",".join(joins))) else: - return Table(j_table=self._j_table.aj(table._j_table, ",".join(on))) + return Table(j_table=self.j_table.aj(table.j_table, ",".join(on))) except Exception as e: raise DHError(e, "table as-of join operation failed.") from e @@ -740,9 +740,9 @@ def raj(self, table: Table, on: List[str], joins: List[str] = []) -> Table: """ try: if joins: - return Table(j_table=self._j_table.raj(table._j_table, ",".join(on), ",".join(joins))) + return Table(j_table=self.j_table.raj(table.j_table, ",".join(on), ",".join(joins))) else: - return Table(j_table=self._j_table.raj(table._j_table, ",".join(on))) + return Table(j_table=self.j_table.raj(table.j_table, ",".join(on))) except Exception as e: raise DHError(e, "table reverse-as-of join operation failed.") from e @@ -765,7 +765,7 @@ def head_by(self, num_rows: int, by: List[str]) -> Table: DHError """ try: - return Table(j_table=self._j_table.headBy(num_rows, *by)) + return Table(j_table=self.j_table.headBy(num_rows, *by)) except Exception as e: raise DHError(e, "table head_by operation failed.") from e @@ -783,7 +783,7 @@ def tail_by(self, num_rows: int, by: List[str]) -> Table: DHError """ try: - return Table(j_table=self._j_table.tailBy(num_rows, *by)) + return Table(j_table=self.j_table.tailBy(num_rows, *by)) except Exception as e: raise DHError(e, "table tail_by operation failed.") from e @@ -802,9 +802,9 @@ def group_by(self, by: List[str] = []) -> Table: """ try: if by: - return Table(j_table=self._j_table.groupBy(*by)) + return Table(j_table=self.j_table.groupBy(*by)) else: - return Table(j_table=self._j_table.groupBy()) + return Table(j_table=self.j_table.groupBy()) except Exception as e: raise DHError(e, "table group operation failed.") from e @@ -824,9 +824,9 @@ def ungroup(self, cols: List[str] = []) -> Table: """ try: if cols: - return Table(j_table=self._j_table.ungroup(*cols)) + return Table(j_table=self.j_table.ungroup(*cols)) else: - return Table(j_table=self._j_table.ungroup()) + return Table(j_table=self.j_table.ungroup()) except Exception as e: raise DHError(e, "table ungroup operation failed.") from e @@ -844,9 +844,9 @@ def first_by(self, by: List[str] = []) -> Table: """ try: if by: - return Table(j_table=self._j_table.firstBy(*by)) + return Table(j_table=self.j_table.firstBy(*by)) else: - return Table(j_table=self._j_table.firstBy()) + return Table(j_table=self.j_table.firstBy()) except Exception as e: raise DHError(e, "table first_by operation failed.") from e @@ -864,9 +864,9 @@ def last_by(self, by: List[str] = []) -> Table: """ try: if by: - return Table(j_table=self._j_table.lastBy(*by)) + return Table(j_table=self.j_table.lastBy(*by)) else: - return Table(j_table=self._j_table.lastBy()) + return Table(j_table=self.j_table.lastBy()) except Exception as e: raise DHError(e, "table last_by operation failed.") from e @@ -884,9 +884,9 @@ def sum_by(self, by: List[str] = []) -> Table: """ try: if by: - return Table(j_table=self._j_table.sumBy(*by)) + return Table(j_table=self.j_table.sumBy(*by)) else: - return Table(j_table=self._j_table.sumBy()) + return Table(j_table=self.j_table.sumBy()) except Exception as e: raise DHError(e, "table sum_by operation failed.") from e @@ -904,9 +904,9 @@ def avg_by(self, by: List[str] = []) -> Table: """ try: if by: - return Table(j_table=self._j_table.avgBy(*by)) + return Table(j_table=self.j_table.avgBy(*by)) else: - return Table(j_table=self._j_table.avgBy()) + return Table(j_table=self.j_table.avgBy()) except Exception as e: raise DHError(e, "table avg_by operation failed.") from e @@ -924,9 +924,9 @@ def std_by(self, by: List[str] = []) -> Table: """ try: if by: - return Table(j_table=self._j_table.stdBy(*by)) + return Table(j_table=self.j_table.stdBy(*by)) else: - return Table(j_table=self._j_table.stdBy()) + return Table(j_table=self.j_table.stdBy()) except Exception as e: raise DHError(e, "table std_by operation failed.") from e @@ -944,9 +944,9 @@ def var_by(self, by: List[str] = []) -> Table: """ try: if by: - return Table(j_table=self._j_table.varBy(*by)) + return Table(j_table=self.j_table.varBy(*by)) else: - return Table(j_table=self._j_table.varBy()) + return Table(j_table=self.j_table.varBy()) except Exception as e: raise DHError(e, "table var_by operation failed.") from e @@ -964,9 +964,9 @@ def median_by(self, by: List[str] = []) -> Table: """ try: if by: - return Table(j_table=self._j_table.medianBy(*by)) + return Table(j_table=self.j_table.medianBy(*by)) else: - return Table(j_table=self._j_table.medianBy()) + return Table(j_table=self.j_table.medianBy()) except Exception as e: raise DHError(e, "table median_by operation failed.") from e @@ -984,9 +984,9 @@ def min_by(self, by: List[str] = []) -> Table: """ try: if by: - return Table(j_table=self._j_table.minBy(*by)) + return Table(j_table=self.j_table.minBy(*by)) else: - return Table(j_table=self._j_table.minBy()) + return Table(j_table=self.j_table.minBy()) except Exception as e: raise DHError(e, "table min_by operation failed.") from e @@ -1004,9 +1004,9 @@ def max_by(self, by: List[str] = []) -> Table: """ try: if by: - return Table(j_table=self._j_table.maxBy(*by)) + return Table(j_table=self.j_table.maxBy(*by)) else: - return Table(j_table=self._j_table.maxBy()) + return Table(j_table=self.j_table.maxBy()) except Exception as e: raise DHError(e, "table max_by operation failed.") from e @@ -1025,9 +1025,9 @@ def count_by(self, col: str, by: List[str] = []) -> Table: """ try: if by: - return Table(j_table=self._j_table.countBy(col, *by)) + return Table(j_table=self.j_table.countBy(col, *by)) else: - return Table(j_table=self._j_table.countBy(col)) + return Table(j_table=self.j_table.countBy(col)) except Exception as e: raise DHError(e, "table count_by operation failed.") from e @@ -1046,7 +1046,7 @@ def agg_by(self, aggs: List[Aggregation], by: List[str]) -> Table: DHError """ try: - return Table(j_table=_JTWD.aggBy(self._j_table, _JAggHolder(*[agg.j_agg for agg in aggs]), *by)) + return Table(j_table=_JTWD.aggBy(self.j_table, _JAggHolder(*[agg.j_agg for agg in aggs]), *by)) except Exception as e: raise DHError(e, "table agg_by operation failed.") from e # endregion diff --git a/pyintegration/tests/test_csv.py b/pyintegration/tests/test_csv.py index 74a4554723e..6d2edeee48a 100644 --- a/pyintegration/tests/test_csv.py +++ b/pyintegration/tests/test_csv.py @@ -4,7 +4,7 @@ import unittest from deephaven2 import dtypes, DHError -from deephaven2 import read_csv +from deephaven2 import read_csv, write_csv from tests.testbase import BaseTestCase @@ -49,6 +49,24 @@ def test_read_error_quote(self): self.assertIsNotNone(cm.exception.compact_traceback) + def test_write(self): + t = read_csv("tests/data/small_sample.csv") + write_csv(t, "./test_write.csv") + t_cols = [col.name for col in t.columns] + t = read_csv("./test_write.csv") + self.assertEqual(t_cols, [col.name for col in t.columns]) + + col_names = ["Strings", "Longs", "Floats"] + col_types = [dtypes.string, dtypes.long, dtypes.float_] + table_header = {k: v for k, v in zip(col_names, col_types)} + t = read_csv('tests/data/test_csv.csv', header=table_header) + write_csv(t, "./test_write.csv", cols=col_names) + t = read_csv('./test_write.csv') + self.assertEqual(col_names, [c.name for c in t.columns]) + + import os + os.remove("./test_write.csv") + if __name__ == '__main__': unittest.main() diff --git a/pyintegration/tests/test_table.py b/pyintegration/tests/test_table.py index 79f591c1dd3..9ba91ea901d 100644 --- a/pyintegration/tests/test_table.py +++ b/pyintegration/tests/test_table.py @@ -40,7 +40,7 @@ def test_time_table(self): t = time_table("00:00:01", start_time="2021-11-06T13:21:00 NY") self.assertEqual(1, len(t.columns)) self.assertTrue(t.is_refreshing) - self.assertEqual("2021-11-06T13:21:00.000000000 NY", t._j_table.getColumnSource("Timestamp").get(0).toString()) + self.assertEqual("2021-11-06T13:21:00.000000000 NY", t.j_table.getColumnSource("Timestamp").get(0).toString()) def test_repr(self): print(self.test_table)