From ae32bfd1827b37d388ed3ab482bb828de11261ab Mon Sep 17 00:00:00 2001 From: JJ Brosnan <84038776+jjbrosnan@users.noreply.github.com> Date: Thu, 9 Dec 2021 12:08:41 -0500 Subject: [PATCH] Java gatherer (#1523) * Push of gatherer after performance and memory leak testing * Add Gatherer wrapper to Python learn submodule * Changes from ./gradlew :Integrations:spotlessApply * Minor update and rename to Pythonic conventions * Fix typo * Another typo * Add _defineSymbols call so class wrappers get defined * Changes. Not fully ready, but I don't want to stash them yet. * More updates. Still not ready, but I want to save these and work on deephaven.java for a bit * Updates * Remove print statement for typeString * Replace functions for each data type with one single function * Add support for python built-in types, not just NumPy types * Fix typo * Convert Python built-in types to NumPy dtypes * Updates from Chip's review. Still testing * Updates per Chip's review. Code has been tested. I will add example testing code in a comment on the PR * spotlessApply * Changes - remove transpose altogether, move transferrer to learn.gather * spotlessApply so checks will pass * Add unit tests for Python and Java. The Python unit tests may fail because of the boolean issue (issue 1590) * Update tests. Java tests still fail, and Python will be updated. But the code has been cleaned up * Minor clean up of Python unit tests * Updates to unit tests, make IndexSet public for use in Python unit tests * spotless apply * Fix Python test syntax * Major changes to Gather functions, and updates to tests/Python code in accordance with * Fix Python test * Fix java tests * Remove commented out java test * Add row- and column-major functions. Also update tests in Java/Python and the corresponding Python code * spotlessApply * Fix for Python * Fix Python test (reference to old IndexSet) * Changes from Chip's review * spotlessApply * Fix Python * Updates from Chip's review * spotlessApply * Updates from Chip's review * Fix return to fix Python test * Chip's suggestion for comment in enum * Put comments below enum values to make Sphinx happy --- .../python/deephaven/learn/__init__.py | 6 +- .../python/deephaven/learn/gather/__init__.py | 122 ++++ Integrations/python/test/test_learn_gather.py | 170 +++++ .../integrations/learn/gather/NumPy.java | 670 ++++++++++++++++++ .../integrations/learn/gather/NumPyTest.java | 167 +++++ 5 files changed, 1134 insertions(+), 1 deletion(-) create mode 100644 Integrations/python/deephaven/learn/gather/__init__.py create mode 100644 Integrations/python/test/test_learn_gather.py create mode 100644 Integrations/src/main/java/io/deephaven/integrations/learn/gather/NumPy.java create mode 100644 Integrations/src/test/java/io/deephaven/integrations/learn/gather/NumPyTest.java diff --git a/Integrations/python/deephaven/learn/__init__.py b/Integrations/python/deephaven/learn/__init__.py index b71d3b52993..55c6b3e9b7a 100644 --- a/Integrations/python/deephaven/learn/__init__.py +++ b/Integrations/python/deephaven/learn/__init__.py @@ -30,13 +30,13 @@ def _defineSymbols(): raise SystemError("No java functionality can be used until the JVM has been initialized through the jpy module") global _Input_, _Output_, _Computer_, _Scatterer_ + if _Input_ is None: _Input_ = jpy.get_type("io.deephaven.integrations.learn.Input") _Output_ = jpy.get_type("io.deephaven.integrations.learn.Output") _Computer_ = jpy.get_type("io.deephaven.integrations.learn.Computer") _Scatterer_ = jpy.get_type("io.deephaven.integrations.learn.Scatterer") - # every module method that invokes Java classes should be decorated with @_passThrough @wrapt.decorator def _passThrough(wrapped, instance, args, kwargs): @@ -55,6 +55,10 @@ def _passThrough(wrapped, instance, args, kwargs): _defineSymbols() return wrapped(*args, **kwargs) +try: + _defineSymbols() +except Exception as e: + pass @_passThrough class Input: diff --git a/Integrations/python/deephaven/learn/gather/__init__.py b/Integrations/python/deephaven/learn/gather/__init__.py new file mode 100644 index 00000000000..e04e9298987 --- /dev/null +++ b/Integrations/python/deephaven/learn/gather/__init__.py @@ -0,0 +1,122 @@ +# +# Copyright (c) 2016 - 2021 Deephaven Data Labs and Patent Pending +# +""" +Utilities for gathering Deephaven table data into Python objects +""" + +import numpy as np +import enum +import jpy +import wrapt + +# None until the first _defineSymbols() call +_gatherer = None + +def _defineSymbols(): + if not jpy.has_jvm(): + raise SystemError("No java functionality can be used until the JVM has been initialized through the jpy module") + + global _gatherer + global Layout + + if _gatherer is None: + _gatherer = jpy.get_type("io.deephaven.integrations.learn.gather.NumPy") + +class MemoryLayout(enum.Enum): + """ + Memory layouts for an array. + """ + ROW_MAJOR = True + """Row-major memory layout.""" + COLUMN_MAJOR = False + """Column-major memory layout.""" + C = True + """Memory layout consistent with C arrays (row-major).""" + FORTRAN = False + """Memory layout consistent with Fortran arrays (column-major).""" + + def __init__(self, is_row_major): + self.is_row_major = is_row_major + + +# Every method that depends on symbols defined via _defineSymbols() should be decorated with @_passThrough +@wrapt.decorator +def _passThrough(wrapped, instance, args, kwargs): + """ + For decoration of module methods, to define necessary symbols at runtime + + :param wrapped: the method to be decorated + :param instance: the object to which the wrapped function was bound when it was called + :param args: the argument list for `wrapped` + :param kwargs: the keyword argument dictionary for `wrapped` + :return: the decorated version of the method + """ + + _defineSymbols() + return wrapped(*args, **kwargs) + +try: + _defineSymbols() +except Exception as e: + pass + +@_passThrough +def convert_to_numpy_dtype(dtype): + """ + Convert an input type to the corresponding NumPy data type + + :param dtype: A Python type + """ + if dtype.__module__ == np.__name__: + return dtype + elif dtype == bool: + dtype = np.bool_ + elif dtype == float: + dtype = np.double + elif dtype == int: + dtype = np.intc + else: + raise ValueError(f"{dtype} is not a data type that can be converted to a NumPy dtype.") + return dtype + +@_passThrough +def table_to_numpy_2d(row_set, col_set, order:MemoryLayout = MemoryLayout.ROW_MAJOR, dtype:np.dtype = np.intc): + """ + Convert Deephaven table data to a 2d NumPy array of the appropriate size + + :param row_set: A RowSequence describing the number of rows in the table + :param col_set: ColumnSources describing which columns to copy + :param order: :param order: The desired memory layout of the output array + :param dtype: The desired NumPy data type of the output NumPy array + :return: A NumPy ndarray + """ + + if not(isinstance(order, MemoryLayout)): + raise ValueError(f"Invalid major order {order}. Please use an enum value from MemoryLayout.") + + dtype = convert_to_numpy_dtype(dtype) + + if dtype == np.byte: + buffer = _gatherer.tensorBuffer2DByte(row_set, col_set, order.is_row_major) + elif dtype == np.short: + buffer = _gatherer.tensorBuffer2DShort(row_set, col_set, order.is_row_major) + elif dtype == np.intc: + buffer = _gatherer.tensorBuffer2DInt(row_set, col_set, order.is_row_major) + elif dtype == np.int_: + buffer = _gatherer.tensorBuffer2DLong(row_set, col_set, order.is_row_major) + elif dtype == np.single: + buffer = _gatherer.tensorBuffer2DFloat(row_set, col_set, order.is_row_major) + elif dtype == np.double: + buffer = _gatherer.tensorBuffer2DDouble(row_set, col_set, order.is_row_major) + else: + raise ValueError(f"Data type {dtype} is not supported.") + + tensor = np.frombuffer(buffer, dtype = dtype) + + if order.is_row_major: + tensor.shape = (len(col_set), row_set.intSize()) + return tensor.T + else: + tensor.shape = (row_set.intSize(), len(col_set)) + return tensor \ No newline at end of file diff --git a/Integrations/python/test/test_learn_gather.py b/Integrations/python/test/test_learn_gather.py new file mode 100644 index 00000000000..eb5de0ad636 --- /dev/null +++ b/Integrations/python/test/test_learn_gather.py @@ -0,0 +1,170 @@ +# +# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending +# + +############################################################################## +# NOTE: the jvm should have been initialized, or this test will certainly fail +############################################################################## + +import pandas as pd +import numpy as np +import unittest +import jpy +import sys +import os + +from deephaven import learn, tableToDataFrame, TableTools +from deephaven.learn import gather + +class TestGather(unittest.TestCase): + """ + Test cases for deephaven.learn submodule + """ + + @classmethod + def setUpClass(cls): + """ + Inherited method allowing initialization of test environment + """ + # Tables + cls.bool_table = TableTools.emptyTable(100).update( + "X = true", + "Y = false", + "Z = (i % 2 == 0) ? true : false" + ) + cls.byte_table = TableTools.emptyTable(100).update( + "X = (byte)i", + "Y = (byte)(100 - X)", + "Z = (byte)(-101 + X)" + ) + cls.short_table = TableTools.emptyTable(100).update( + "X = (short)i", + "Y = (short)(100 - X)", + "Z = (short)(-101 + X)" + ) + cls.int_table = TableTools.emptyTable(100).update( + "X = (int)i", + "Y = 100 - X", + "Z = -101 + X" + ) + cls.long_table = TableTools.emptyTable(100).update( + "X = (long)i", + "Y = 100 - X", + "Z = -101 + X" + ) + cls.float_table = TableTools.emptyTable(100).update( + "X = (float)i", + "Y = (float)sqrt(X)", + "Z = (float)sqrt(Y)" + ) + cls.double_table = TableTools.emptyTable(100).update( + "X = (double)i", + "Y = sqrt(X)", + "Z = sqrt(Y)" + ) + # NumPy arrays + cls.bool_array = \ + np.array([[True, False, True], [True, False, False]] * 50, + dtype = np.bool_) + cls.byte_array = np.vstack(( + np.arange(0, 100, dtype = np.byte), + np.arange(100, 0, -1, dtype = np.byte), + np.arange(-101, -1, dtype = np.byte) + )).T + cls.short_array = np.vstack(( + np.arange(0, 100, dtype = np.short), + np.arange(100, 0, -1, dtype = np.short), + np.arange(-101, -1, dtype = np.short) + )).T + cls.int_array = np.vstack(( + np.arange(0, 100, dtype = np.intc), + np.arange(100, 0, -1, dtype = np.intc), + np.arange(-101, -1, dtype = np.intc) + )).T + cls.long_array = np.vstack(( + np.arange(0, 100, dtype = np.int_), + np.arange(100, 0, -1, dtype = np.int_), + np.arange(-101, -1, dtype = np.int_) + )).T + cls.float_array = np.vstack(( + np.arange(0, 100, dtype = np.single), + np.sqrt(np.arange(0, 100, dtype = np.single)), + np.sqrt(np.sqrt(np.arange(0, 100, dtype = np.single))) + )).T + cls.double_array = np.vstack(( + np.arange(0, 100, dtype = np.double), + np.sqrt(np.arange(0, 100, dtype = np.double)), + np.sqrt(np.sqrt(np.arange(0, 100, dtype = np.double))) + )).T + + # Model for learn to use when dtype = [np.bool_] + def boolean_model(self, features): + return np.count_nonzero(features, axis = 1) < 2 + + # Model for learn to use when dtype = [np.byte, np.short, np.intc, np.int_] + def integer_model(self, features): + return np.sum(features, axis = 1) + + # Model for learn to use when dtype = [np.single, np.double] + def decimal_model(self, features): + return np.prod(features, axis = 1) + + # Test byte data types + def test_byte(self): + self.base_test(source = self.byte_table, model = self.integer_model, np_dtype = np.byte) + + # Test short data types + def test_short(self): + self.base_test(source = self.short_table, model = self.integer_model, np_dtype = np.short) + + # Test int data types + def test_int(self): + self.base_test(source = self.int_table, model = self.integer_model, np_dtype = np.intc) + + # Test long data types + def test_long(self): + self.base_test(source = self.long_table, model = self.integer_model, np_dtype = np.int_) + + # Test float data types + def test_float(self): + self.base_test(source = self.float_table, model = self.decimal_model, np_dtype = np.single) + + # Test double data types + def test_double(self): + self.base_test(source = self.double_table, model = self.decimal_model, np_dtype = np.double) + + # The base test, which other tests will be built from + def base_test(self, source, model, np_dtype): + + rows = source.getRowSet() + cols = [source.getColumnSource(col) for col in ["X", "Y", "Z"]] + + gatherer_rowmajor = lambda rowset, colset : gather.table_to_numpy_2d(rowset, colset, gather.MemoryLayout.ROW_MAJOR, np_dtype) + gatherer_colmajor = lambda rowset, colset : gather.table_to_numpy_2d(rowset, colset, gather.MemoryLayout.COLUMN_MAJOR, np_dtype) + + array_from_table = tableToDataFrame(source).values + + gathered_rowmajor = gatherer_rowmajor(rows, cols) + gathered_colmajor = gatherer_colmajor(rows, cols) + + with self.subTest(msg = "Array shape"): + self.assertTrue(gathered_rowmajor.shape == array_from_table.shape) + print("Row major gathered shape: {}".format(gathered_rowmajor.shape)) + self.assertTrue(gathered_colmajor.shape == array_from_table.shape) + print("Column major gathered shape: {}".format(gathered_colmajor.shape)) + with self.subTest(msg = "Values in array"): + self.assertTrue(np.allclose(gathered_rowmajor, array_from_table)) + print("All row-major array values are equal") + self.assertTrue(np.allclose(gathered_colmajor, array_from_table)) + print("All column-major array values are equal") + with self.subTest(msg = "Array data type"): + self.assertTrue(gathered_rowmajor.dtype == np_dtype) + self.assertTrue(gathered_rowmajor.dtype == array_from_table.dtype) + self.assertTrue(gathered_colmajor.dtype == np_dtype) + self.assertTrue(gathered_colmajor.dtype == array_from_table.dtype) + self.assertTrue(gathered_rowmajor.dtype == gathered_colmajor.dtype) + print("Array dtype: {}".format(np_dtype)) + with self.subTest(msg = "Contiguity"): + self.assertTrue(gathered_rowmajor.flags["C_CONTIGUOUS"] or gathered_rowmajor.flags["F_CONTIGUOUS"]) + self.assertTrue(gathered_colmajor.flags["C_CONTIGUOUS"] or gathered_colmajor.flags["F_CONTIGUOUS"]) + print("Array contiguity checked") \ No newline at end of file diff --git a/Integrations/src/main/java/io/deephaven/integrations/learn/gather/NumPy.java b/Integrations/src/main/java/io/deephaven/integrations/learn/gather/NumPy.java new file mode 100644 index 00000000000..d89a3eb0c27 --- /dev/null +++ b/Integrations/src/main/java/io/deephaven/integrations/learn/gather/NumPy.java @@ -0,0 +1,670 @@ +package io.deephaven.integrations.learn.gather; + +import io.deephaven.chunk.*; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.table.ChunkSource; +import io.deephaven.engine.table.ColumnSource; +import io.deephaven.engine.table.SharedContext; +import io.deephaven.util.SafeCloseableList; + +/** + * Gatherer takes Deephaven columnar data and places it into a buffer to be used by Python. The Python object will take + * data from the buffer and use it to construct a 2d array of specified size. + */ +public class NumPy { + + private static final int COPY_CHUNK_SIZE = 2048; + + /** + * Copy data from a table into a 2d tensor of Booleans. + * + * @param rowSeq indices of the rows of the table to put into the tensor + * @param columnSources columns of data to put into the tensor + * @param columnMajorOrder true to return a column-major array; false to return a row-major array + * @return contiguous RAM allocated for the tensor + */ + public static boolean[] tensorBuffer2DBoolean(final RowSequence rowSeq, final ColumnSource[] columnSources, + boolean columnMajorOrder) { + if (columnMajorOrder) { + boolean[] tensor = tensorBuffer2DBooleanColumnMajor(rowSeq, columnSources); + return tensor; + } else { + boolean[] tensor = tensorBuffer2DBooleanRowMajor(rowSeq, columnSources); + return tensor; + } + } + + /** + * Copy data from a table into a 2d tensor of Booleans in column-major order. + * + * @param rowSeq indices of the rows of the table to put into the tensor + * @param columnSources columns of data to put into the tensor + * @return contiguous RAM allocated for the tensor + */ + private static boolean[] tensorBuffer2DBooleanColumnMajor(final RowSequence rowSeq, + final ColumnSource[] columnSources) { + final int nRows = rowSeq.intSize(); + final int nCols = columnSources.length; + final boolean[] tensor = new boolean[nRows * nCols]; + + try (final ResettableWritableBooleanChunk valueChunk = + ResettableWritableBooleanChunk.makeResettableChunk(); + final SharedContext sharedContext = SharedContext.makeSharedContext()) { + + for (int ci = 0; ci < nCols; ++ci) { + valueChunk.resetFromArray(tensor, ci * nRows, nRows); + final ColumnSource colSrc = columnSources[ci]; + + try (final ChunkSource.FillContext fillContext = colSrc.makeFillContext(nRows, sharedContext)) { + // noinspection unchecked + colSrc.fillChunk(fillContext, valueChunk, rowSeq); + } + } + } + + return tensor; + } + + /** + * Copy data from a table into a 2d tensor of Booleans in row-major order. + * + * @param rowSeq indices of the rows of the table to put into the tensor + * @param columnSources columns of data to put into the tensor + * @return contiguous RAM allocated for the tensor + */ + private static boolean[] tensorBuffer2DBooleanRowMajor(final RowSequence rowSeq, + final ColumnSource[] columnSources) { + final int nRows = rowSeq.intSize(); + final int nCols = columnSources.length; + final boolean[] tensor = new boolean[nRows * nCols]; + + try (final SafeCloseableList toClose = new SafeCloseableList()) { + final RowSequence.Iterator rowKeys = toClose.add(rowSeq.getRowSequenceIterator()); + final SharedContext inputSharedContext = toClose.add(SharedContext.makeSharedContext()); + final ChunkSource.GetContext[] inputContexts = toClose.addArray(new ChunkSource.GetContext[nCols]); + for (int ci = 0; ci < nCols; ++ci) { + inputContexts[ci] = columnSources[ci].makeGetContext(COPY_CHUNK_SIZE, inputSharedContext); + } + + // noinspection unchecked + final BooleanChunk[] inputColumnValues = new BooleanChunk[nCols]; + int ti = 0; + while (rowKeys.hasMore()) { + final RowSequence sliceRowKeys = rowKeys.getNextRowSequenceWithLength(COPY_CHUNK_SIZE); + for (int ci = 0; ci < nCols; ++ci) { + inputColumnValues[ci] = + columnSources[ci].getChunk(inputContexts[ci], sliceRowKeys).asBooleanChunk(); + } + + final int sliceChunkSize = sliceRowKeys.intSize(); + for (int ri = 0; ri < sliceChunkSize; ++ri) { + for (int ci = 0; ci < nCols; ++ci) { + tensor[ti++] = inputColumnValues[ci].get(ri); + } + } + inputSharedContext.reset(); + } + } + + return tensor; + } + + /** + * Copy data from a table into a 2d tensor of Bytes. + * + * @param rowSeq indices of the rows of the table to put into the tensor + * @param columnSources columns of data to put into the tensor + * @param columnMajorOrder true to return a column-major array; false to return a row-major array + * @return contiguous RAM allocated for the tensor + */ + public static byte[] tensorBuffer2DByte(final RowSequence rowSeq, final ColumnSource[] columnSources, + boolean columnMajorOrder) { + if (columnMajorOrder) { + byte[] tensor = tensorBuffer2DByteColumnMajor(rowSeq, columnSources); + return tensor; + } else { + byte[] tensor = tensorBuffer2DByteRowMajor(rowSeq, columnSources); + return tensor; + } + } + + /** + * Copy data from a table into a 2d tensor of bytes in column-major order. + * + * @param rowSeq indices of the rows of the table to put into the tensor + * @param columnSources columns of data to put into the tensor + * @return contiguous RAM allocated for the tensor + */ + private static byte[] tensorBuffer2DByteColumnMajor(final RowSequence rowSeq, + final ColumnSource[] columnSources) { + final int nRows = rowSeq.intSize(); + final int nCols = columnSources.length; + final byte[] tensor = new byte[nRows * nCols]; + + try (final ResettableWritableByteChunk valueChunk = + ResettableWritableByteChunk.makeResettableChunk(); + final SharedContext sharedContext = SharedContext.makeSharedContext()) { + + for (int ci = 0; ci < nCols; ++ci) { + valueChunk.resetFromArray(tensor, ci * nRows, nRows); + final ColumnSource colSrc = columnSources[ci]; + + try (final ChunkSource.FillContext fillContext = colSrc.makeFillContext(nRows, sharedContext)) { + // noinspection unchecked + colSrc.fillChunk(fillContext, valueChunk, rowSeq); + } + } + } + + return tensor; + } + + /** + * Copy data from a table into a 2d tensor of bytes in row-major order. + * + * @param rowSeq indices of the rows of the table to put into the tensor + * @param columnSources columns of data to put into the tensor + * @return contiguous RAM allocated for the tensor + */ + private static byte[] tensorBuffer2DByteRowMajor(final RowSequence rowSeq, final ColumnSource[] columnSources) { + final int nRows = rowSeq.intSize(); + final int nCols = columnSources.length; + final byte[] tensor = new byte[nRows * nCols]; + + try (final SafeCloseableList toClose = new SafeCloseableList()) { + final RowSequence.Iterator rowKeys = toClose.add(rowSeq.getRowSequenceIterator()); + final SharedContext inputSharedContext = toClose.add(SharedContext.makeSharedContext()); + final ChunkSource.GetContext[] inputContexts = toClose.addArray(new ChunkSource.GetContext[nCols]); + for (int ci = 0; ci < nCols; ++ci) { + inputContexts[ci] = columnSources[ci].makeGetContext(COPY_CHUNK_SIZE, inputSharedContext); + } + + // noinspection unchecked + final ByteChunk[] inputColumnValues = new ByteChunk[nCols]; + int ti = 0; + while (rowKeys.hasMore()) { + final RowSequence sliceRowKeys = rowKeys.getNextRowSequenceWithLength(COPY_CHUNK_SIZE); + for (int ci = 0; ci < nCols; ++ci) { + inputColumnValues[ci] = columnSources[ci].getChunk(inputContexts[ci], sliceRowKeys).asByteChunk(); + } + + final int sliceChunkSize = sliceRowKeys.intSize(); + for (int ri = 0; ri < sliceChunkSize; ++ri) { + for (int ci = 0; ci < nCols; ++ci) { + tensor[ti++] = inputColumnValues[ci].get(ri); + } + } + inputSharedContext.reset(); + } + } + + return tensor; + } + + /** + * Copy data from a table into a 2d tensor of shorts. + * + * @param rowSeq indices of the rows of the table to put into the tensor + * @param columnSources columns of data to put into the tensor + * @param columnMajorOrder true to return a column-major array; false to return a row-major array + * @return contiguous RAM allocated for the tensor + */ + public static short[] tensorBuffer2DShort(final RowSequence rowSeq, final ColumnSource[] columnSources, + boolean columnMajorOrder) { + if (columnMajorOrder) { + short[] tensor = tensorBuffer2DShortColumnMajor(rowSeq, columnSources); + return tensor; + } else { + short[] tensor = tensorBuffer2DShortRowMajor(rowSeq, columnSources); + return tensor; + } + } + + /** + * Copy data from a table into a 2d tensor of shorts in column-major order. + * + * @param rowSeq indices of the rows of the table to put into the tensor + * @param columnSources columns of data to put into the tensor + * @return contiguous RAM allocated for the tensor + */ + + private static short[] tensorBuffer2DShortColumnMajor(final RowSequence rowSeq, + final ColumnSource[] columnSources) { + final int nRows = rowSeq.intSize(); + final int nCols = columnSources.length; + final short[] tensor = new short[nRows * nCols]; + + try (final ResettableWritableShortChunk valueChunk = + ResettableWritableShortChunk.makeResettableChunk(); + final SharedContext sharedContext = SharedContext.makeSharedContext()) { + + for (int ci = 0; ci < nCols; ++ci) { + valueChunk.resetFromArray(tensor, ci * nRows, nRows); + final ColumnSource colSrc = columnSources[ci]; + + try (final ChunkSource.FillContext fillContext = colSrc.makeFillContext(nRows, sharedContext)) { + // noinspection unchecked + colSrc.fillChunk(fillContext, valueChunk, rowSeq); + } + } + } + + return tensor; + } + + /** + * Copy data from a table into a 2d tensor of shorts in row-major order. + * + * @param rowSeq indices of the rows of the table to put into the tensor + * @param columnSources columns of data to put into the tensor + * @return contiguous RAM allocated for the tensor + */ + private static short[] tensorBuffer2DShortRowMajor(final RowSequence rowSeq, + final ColumnSource[] columnSources) { + final int nRows = rowSeq.intSize(); + final int nCols = columnSources.length; + final short[] tensor = new short[nRows * nCols]; + + try (final SafeCloseableList toClose = new SafeCloseableList()) { + final RowSequence.Iterator rowKeys = toClose.add(rowSeq.getRowSequenceIterator()); + final SharedContext inputSharedContext = toClose.add(SharedContext.makeSharedContext()); + final ChunkSource.GetContext[] inputContexts = toClose.addArray(new ChunkSource.GetContext[nCols]); + for (int ci = 0; ci < nCols; ++ci) { + inputContexts[ci] = columnSources[ci].makeGetContext(COPY_CHUNK_SIZE, inputSharedContext); + } + + // noinspection unchecked + final ShortChunk[] inputColumnValues = new ShortChunk[nCols]; + int ti = 0; + while (rowKeys.hasMore()) { + final RowSequence sliceRowKeys = rowKeys.getNextRowSequenceWithLength(COPY_CHUNK_SIZE); + for (int ci = 0; ci < nCols; ++ci) { + inputColumnValues[ci] = columnSources[ci].getChunk(inputContexts[ci], sliceRowKeys).asShortChunk(); + } + + final int sliceChunkSize = sliceRowKeys.intSize(); + for (int ri = 0; ri < sliceChunkSize; ++ri) { + for (int ci = 0; ci < nCols; ++ci) { + tensor[ti++] = inputColumnValues[ci].get(ri); + } + } + inputSharedContext.reset(); + } + } + + return tensor; + } + + /** + * Copy data from a table into a 2d tensor of ints. + * + * @param rowSeq indices of the rows of the table to put into the tensor + * @param columnSources columns of data to put into the tensor + * @param columnMajorOrder true to return a column-major array; false to return a row-major array + * @return contiguous RAM allocated for the tensor + */ + public static int[] tensorBuffer2DInt(final RowSequence rowSeq, final ColumnSource[] columnSources, + boolean columnMajorOrder) { + if (columnMajorOrder) { + int[] tensor = tensorBuffer2DIntColumnMajor(rowSeq, columnSources); + return tensor; + } else { + int[] tensor = tensorBuffer2DIntRowMajor(rowSeq, columnSources); + return tensor; + } + } + + /** + * Copy data from a table into a 2d tensor of ints in column-major order. + * + * @param rowSeq indices of the rows of the table to put into the tensor + * @param columnSources columns of data to put into the tensor + * @return contiguous RAM allocated for the tensor + */ + private static int[] tensorBuffer2DIntColumnMajor(final RowSequence rowSeq, final ColumnSource[] columnSources) { + final int nRows = rowSeq.intSize(); + final int nCols = columnSources.length; + final int[] tensor = new int[nRows * nCols]; + + try (final ResettableWritableIntChunk valueChunk = + ResettableWritableIntChunk.makeResettableChunk(); + final SharedContext sharedContext = SharedContext.makeSharedContext()) { + + for (int ci = 0; ci < nCols; ++ci) { + valueChunk.resetFromArray(tensor, ci * nRows, nRows); + final ColumnSource colSrc = columnSources[ci]; + + try (final ChunkSource.FillContext fillContext = colSrc.makeFillContext(nRows, sharedContext)) { + // noinspection unchecked + colSrc.fillChunk(fillContext, valueChunk, rowSeq); + } + } + } + + return tensor; + } + + /** + * Copy data from a table into a 2d tensor of ints in row-major order. + * + * @param rowSeq indices of the rows of the table to put into the tensor + * @param columnSources columns of data to put into the tensor + * @return contiguous RAM allocated for the tensor + */ + private static int[] tensorBuffer2DIntRowMajor(final RowSequence rowSeq, final ColumnSource[] columnSources) { + final int nRows = rowSeq.intSize(); + final int nCols = columnSources.length; + final int[] tensor = new int[nRows * nCols]; + + try (final SafeCloseableList toClose = new SafeCloseableList()) { + final RowSequence.Iterator rowKeys = toClose.add(rowSeq.getRowSequenceIterator()); + final SharedContext inputSharedContext = toClose.add(SharedContext.makeSharedContext()); + final ChunkSource.GetContext[] inputContexts = toClose.addArray(new ChunkSource.GetContext[nCols]); + for (int ci = 0; ci < nCols; ++ci) { + inputContexts[ci] = columnSources[ci].makeGetContext(COPY_CHUNK_SIZE, inputSharedContext); + } + + // noinspection unchecked + final IntChunk[] inputColumnValues = new IntChunk[nCols]; + int ti = 0; + while (rowKeys.hasMore()) { + final RowSequence sliceRowKeys = rowKeys.getNextRowSequenceWithLength(COPY_CHUNK_SIZE); + for (int ci = 0; ci < nCols; ++ci) { + inputColumnValues[ci] = columnSources[ci].getChunk(inputContexts[ci], sliceRowKeys).asIntChunk(); + } + + final int sliceChunkSize = sliceRowKeys.intSize(); + for (int ri = 0; ri < sliceChunkSize; ++ri) { + for (int ci = 0; ci < nCols; ++ci) { + tensor[ti++] = inputColumnValues[ci].get(ri); + } + } + inputSharedContext.reset(); + } + } + + return tensor; + } + + /** + * Copy data from a table into a 2d tensor of longs. + * + * @param rowSeq indices of the rows of the table to put into the tensor + * @param columnSources columns of data to put into the tensor + * @param columnMajorOrder true to return a column-major array; false to return a row-major array + * @return contiguous RAM allocated for the tensor + */ + public static long[] tensorBuffer2DLong(final RowSequence rowSeq, final ColumnSource[] columnSources, + boolean columnMajorOrder) { + if (columnMajorOrder) { + long[] tensor = tensorBuffer2DLongColumnMajor(rowSeq, columnSources); + return tensor; + } else { + long[] tensor = tensorBuffer2DLongRowMajor(rowSeq, columnSources); + return tensor; + } + } + + /** + * Copy data from a table into a 2d tensor of longs. + * + * @param rowSeq indices of the rows of the table to put into the tensor + * @param columnSources columns of data to put into the tensor + * @return contiguous RAM allocated for the tensor + */ + + private static long[] tensorBuffer2DLongColumnMajor(final RowSequence rowSeq, + final ColumnSource[] columnSources) { + final int nRows = rowSeq.intSize(); + final int nCols = columnSources.length; + final long[] tensor = new long[nRows * nCols]; + + try (final ResettableWritableLongChunk valueChunk = + ResettableWritableLongChunk.makeResettableChunk(); + final SharedContext sharedContext = SharedContext.makeSharedContext()) { + + for (int ci = 0; ci < nCols; ++ci) { + valueChunk.resetFromArray(tensor, ci * nRows, nRows); + final ColumnSource colSrc = columnSources[ci]; + + try (final ChunkSource.FillContext fillContext = colSrc.makeFillContext(nRows, sharedContext)) { + // noinspection unchecked + colSrc.fillChunk(fillContext, valueChunk, rowSeq); + } + } + } + + return tensor; + } + + /** + * Copy data from a table into a 2d tensor of longs in row-major order. + * + * @param rowSeq indices of the rows of the table to put into the tensor + * @param columnSources columns of data to put into the tensor + * @return contiguous RAM allocated for the tensor + */ + private static long[] tensorBuffer2DLongRowMajor(final RowSequence rowSeq, final ColumnSource[] columnSources) { + final int nRows = rowSeq.intSize(); + final int nCols = columnSources.length; + final long[] tensor = new long[nRows * nCols]; + + try (final SafeCloseableList toClose = new SafeCloseableList()) { + final RowSequence.Iterator rowKeys = toClose.add(rowSeq.getRowSequenceIterator()); + final SharedContext inputSharedContext = toClose.add(SharedContext.makeSharedContext()); + final ChunkSource.GetContext[] inputContexts = toClose.addArray(new ChunkSource.GetContext[nCols]); + for (int ci = 0; ci < nCols; ++ci) { + inputContexts[ci] = columnSources[ci].makeGetContext(COPY_CHUNK_SIZE, inputSharedContext); + } + + // noinspection unchecked + final LongChunk[] inputColumnValues = new LongChunk[nCols]; + int ti = 0; + while (rowKeys.hasMore()) { + final RowSequence sliceRowKeys = rowKeys.getNextRowSequenceWithLength(COPY_CHUNK_SIZE); + for (int ci = 0; ci < nCols; ++ci) { + inputColumnValues[ci] = columnSources[ci].getChunk(inputContexts[ci], sliceRowKeys).asLongChunk(); + } + + final int sliceChunkSize = sliceRowKeys.intSize(); + for (int ri = 0; ri < sliceChunkSize; ++ri) { + for (int ci = 0; ci < nCols; ++ci) { + tensor[ti++] = inputColumnValues[ci].get(ri); + } + } + inputSharedContext.reset(); + } + } + + return tensor; + } + + /** + * Copy data from a table into a 2d tensor of floats. + * + * @param rowSeq indices of the rows of the table to put into the tensor + * @param columnSources columns of data to put into the tensor + * @param columnMajorOrder true to return a column-major array; false to return a row-major array + * @return contiguous RAM allocated for the tensor + */ + public static float[] tensorBuffer2DFloat(final RowSequence rowSeq, final ColumnSource[] columnSources, + boolean columnMajorOrder) { + if (columnMajorOrder) { + float[] tensor = tensorBuffer2DFloatColumnMajor(rowSeq, columnSources); + return tensor; + } else { + float[] tensor = tensorBuffer2DFloatRowMajor(rowSeq, columnSources); + return tensor; + } + } + + /** + * Copy data from a table into a 2d tensor of floats in column-major order. + * + * @param rowSeq indices of the rows of the table to put into the tensor + * @param columnSources columns of data to put into the tensor + * @return contiguous RAM allocated for the tensor + */ + + private static float[] tensorBuffer2DFloatColumnMajor(final RowSequence rowSeq, + final ColumnSource[] columnSources) { + final int nRows = rowSeq.intSize(); + final int nCols = columnSources.length; + final float[] tensor = new float[nRows * nCols]; + + try (final ResettableWritableFloatChunk valueChunk = + ResettableWritableFloatChunk.makeResettableChunk(); + final SharedContext sharedContext = SharedContext.makeSharedContext()) { + + for (int ci = 0; ci < nCols; ++ci) { + valueChunk.resetFromArray(tensor, ci * nRows, nRows); + final ColumnSource colSrc = columnSources[ci]; + + try (final ChunkSource.FillContext fillContext = colSrc.makeFillContext(nRows, sharedContext)) { + // noinspection unchecked + colSrc.fillChunk(fillContext, valueChunk, rowSeq); + } + } + } + + return tensor; + } + + /** + * Copy data from a table into a 2d tensor of floats in row-major order. + * + * @param rowSeq indices of the rows of the table to put into the tensor + * @param columnSources columns of data to put into the tensor + * @return contiguous RAM allocated for the tensor + */ + private static float[] tensorBuffer2DFloatRowMajor(final RowSequence rowSeq, + final ColumnSource[] columnSources) { + final int nRows = rowSeq.intSize(); + final int nCols = columnSources.length; + final float[] tensor = new float[nRows * nCols]; + + try (final SafeCloseableList toClose = new SafeCloseableList()) { + final RowSequence.Iterator rowKeys = toClose.add(rowSeq.getRowSequenceIterator()); + final SharedContext inputSharedContext = toClose.add(SharedContext.makeSharedContext()); + final ChunkSource.GetContext[] inputContexts = toClose.addArray(new ChunkSource.GetContext[nCols]); + for (int ci = 0; ci < nCols; ++ci) { + inputContexts[ci] = columnSources[ci].makeGetContext(COPY_CHUNK_SIZE, inputSharedContext); + } + + // noinspection unchecked + final FloatChunk[] inputColumnValues = new FloatChunk[nCols]; + int ti = 0; + while (rowKeys.hasMore()) { + final RowSequence sliceRowKeys = rowKeys.getNextRowSequenceWithLength(COPY_CHUNK_SIZE); + for (int ci = 0; ci < nCols; ++ci) { + inputColumnValues[ci] = columnSources[ci].getChunk(inputContexts[ci], sliceRowKeys).asFloatChunk(); + } + + final int sliceChunkSize = sliceRowKeys.intSize(); + for (int ri = 0; ri < sliceChunkSize; ++ri) { + for (int ci = 0; ci < nCols; ++ci) { + tensor[ti++] = inputColumnValues[ci].get(ri); + } + } + inputSharedContext.reset(); + } + } + + return tensor; + } + + /** + * Copy data from a table into a 2d tensor of doubles. + * + * @param rowSeq indices of the rows of the table to put into the tensor + * @param columnSources columns of data to put into the tensor + * @param columnMajorOrder true to return a column-major array; false to return a row-major array + * @return contiguous RAM allocated for the tensor + */ + public static double[] tensorBuffer2DDouble(final RowSequence rowSeq, final ColumnSource[] columnSources, + boolean columnMajorOrder) { + if (columnMajorOrder) { + double[] tensor = tensorBuffer2DDoubleColumnMajor(rowSeq, columnSources); + return tensor; + } else { + double[] tensor = tensorBuffer2DDoubleRowMajor(rowSeq, columnSources); + return tensor; + } + } + + /** + * Copy data from a table into a 2d tensor of doubles in column-major order. + * + * @param rowSeq indices of the rows of the table to put into the tensor + * @param columnSources columns of data to put into the tensor + * @return contiguous RAM allocated for the tensor + */ + private static double[] tensorBuffer2DDoubleColumnMajor(final RowSequence rowSeq, + final ColumnSource[] columnSources) { + final int nRows = rowSeq.intSize(); + final int nCols = columnSources.length; + final double[] tensor = new double[nRows * nCols]; + + try (final ResettableWritableDoubleChunk valueChunk = + ResettableWritableDoubleChunk.makeResettableChunk(); + final SharedContext sharedContext = SharedContext.makeSharedContext()) { + + for (int ci = 0; ci < nCols; ++ci) { + valueChunk.resetFromArray(tensor, ci * nRows, nRows); + final ColumnSource colSrc = columnSources[ci]; + + try (final ChunkSource.FillContext fillContext = colSrc.makeFillContext(nRows, sharedContext)) { + // noinspection unchecked + colSrc.fillChunk(fillContext, valueChunk, rowSeq); + } + } + } + + return tensor; + } + + /** + * Copy data from a table into a 2d tensor of doubles in row-major order. + * + * @param rowSeq indices of the rows of the table to put into the tensor + * @param columnSources columns of data to put into the tensor + * @return contiguous RAM allocated for the tensor + */ + private static double[] tensorBuffer2DDoubleRowMajor(final RowSequence rowSeq, + final ColumnSource[] columnSources) { + final int nRows = rowSeq.intSize(); + final int nCols = columnSources.length; + final double[] tensor = new double[nRows * nCols]; + + try (final SafeCloseableList toClose = new SafeCloseableList()) { + final RowSequence.Iterator rowKeys = toClose.add(rowSeq.getRowSequenceIterator()); + final SharedContext inputSharedContext = toClose.add(SharedContext.makeSharedContext()); + final ChunkSource.GetContext[] inputContexts = toClose.addArray(new ChunkSource.GetContext[nCols]); + for (int ci = 0; ci < nCols; ++ci) { + inputContexts[ci] = columnSources[ci].makeGetContext(COPY_CHUNK_SIZE, inputSharedContext); + } + + // noinspection unchecked + final DoubleChunk[] inputColumnValues = new DoubleChunk[nCols]; + int ti = 0; + while (rowKeys.hasMore()) { + final RowSequence sliceRowKeys = rowKeys.getNextRowSequenceWithLength(COPY_CHUNK_SIZE); + for (int ci = 0; ci < nCols; ++ci) { + inputColumnValues[ci] = columnSources[ci].getChunk(inputContexts[ci], sliceRowKeys).asDoubleChunk(); + } + + final int sliceChunkSize = sliceRowKeys.intSize(); + for (int ri = 0; ri < sliceChunkSize; ++ri) { + for (int ci = 0; ci < nCols; ++ci) { + tensor[ti++] = inputColumnValues[ci].get(ri); + } + } + inputSharedContext.reset(); + } + } + + return tensor; + } + +} diff --git a/Integrations/src/test/java/io/deephaven/integrations/learn/gather/NumPyTest.java b/Integrations/src/test/java/io/deephaven/integrations/learn/gather/NumPyTest.java new file mode 100644 index 00000000000..ca6b1969e55 --- /dev/null +++ b/Integrations/src/test/java/io/deephaven/integrations/learn/gather/NumPyTest.java @@ -0,0 +1,167 @@ +package io.deephaven.integrations.learn.gather; + +import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.table.ColumnSource; +import io.deephaven.engine.table.impl.InMemoryTable; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import java.util.function.BiFunction; +import java.util.function.Function; + +public class NumPyTest { + + private static InMemoryTable table; + private static final String[] boolColNames = {"bool1", "bool2"}; + private static final boolean[][] boolData = { + new boolean[] {true, true, false, false}, + new boolean[] {true, false, true, false} + }; + private static final String[] byteColNames = {"byte1", "byte2"}; + private static final byte[][] byteData = { + new byte[] {(byte) 1, (byte) 2, (byte) 3, (byte) 4}, + new byte[] {(byte) 5, (byte) 6, (byte) 7, (byte) 8} + }; + private static final String[] shortColNames = {"short1", "short2"}; + private static final short[][] shortData = { + new short[] {(short) -1, (short) -2, (short) -3, (short) -4}, + new short[] {(short) -5, (short) -6, (short) -7, (short) -8} + }; + private static final String[] intColNames = {"int1", "int2"}; + private static final int[][] intData = { + new int[] {100, 200, -100, -200}, + new int[] {-300, -400, 300, 400}, + }; + private static final String[] longColNames = {"long1", "long2"}; + private static final long[][] longData = { + new long[] {1L, 100L, 10000L, 1000000L}, + new long[] {9L, 999L, 99999L, 9999999L}, + }; + private static final String[] floatColNames = {"float1", "float2"}; + private static final float[][] floatData = { + new float[] {3.14F, 2.73F, 1.5F, 0.63F}, + new float[] {0.1F, 0.2F, 0.3F, 0.4F}, + }; + private static final String[] doubleColNames = {"double1", "double2"}; + private static final double[][] doubleData = { + new double[] {3.14, 2.73, 1.5, 0.63}, + new double[] {0.1, 0.2, 0.3, 0.4} + }; + private static final String[] columnNames = new String[] { + boolColNames[0], boolColNames[1], + byteColNames[0], byteColNames[1], + shortColNames[0], shortColNames[1], + intColNames[0], intColNames[1], + longColNames[0], longColNames[1], + floatColNames[0], floatColNames[1], + doubleColNames[0], doubleColNames[1] + }; + private static final Object[] columnData = new Object[] { + boolData[0], boolData[1], + byteData[0], byteData[1], + shortData[0], shortData[1], + intData[0], intData[1], + longData[0], longData[1], + floatData[0], floatData[1], + doubleData[0], doubleData[1] + }; + + @BeforeClass + public static void setup() { + table = new InMemoryTable(columnNames, columnData); + } + + public static ColumnSource[] getColSet(final String[] colNames) { + ColumnSource[] rst = new ColumnSource[2]; + + for (int i = 0; i < 2; i++) { + rst[i] = table.getColumnSource(colNames[i]); + } + + return rst; + } + + private static void assertRowMajor(BiFunction expected, Function actual) { + // Data should be stored in row-major order + int idx = 0; + for (int j = 0; j < 4; j++) { + for (int i = 0; i < 2; i++) { + Assert.assertEquals("i=" + i + " j=" + j, expected.apply(i, j), actual.apply(idx)); + idx++; + } + } + } + + + private static void assertColumnMajor(BiFunction expected, Function actual) { + // Data should be stored in column-major order + int idx = 0; + for (int i = 0; i < 2; i++) { + for (int j = 0; j < 4; j++) { + Assert.assertEquals("i=" + i + " j=" + j, expected.apply(i, j), actual.apply(idx)); + idx++; + } + } + } + + @Test + public void byteTestMethod() { + RowSequence rowSet = table.getRowSet(); + ColumnSource[] colSet = getColSet(byteColNames); + byte[] resultColumnMajor = NumPy.tensorBuffer2DByte(rowSet, colSet, true); + assertColumnMajor((i, j) -> byteData[i][j], i -> resultColumnMajor[i]); + byte[] resultRowMajor = NumPy.tensorBuffer2DByte(rowSet, colSet, false); + assertRowMajor((i, j) -> byteData[i][j], i -> resultRowMajor[i]); + } + + @Test + public void shortTestMethod() { + RowSequence rowSet = table.getRowSet(); + ColumnSource[] colSet = getColSet(shortColNames); + short[] resultColumnMajor = NumPy.tensorBuffer2DShort(rowSet, colSet, true); + assertColumnMajor((i, j) -> shortData[i][j], i -> resultColumnMajor[i]); + short[] resultRowMajor = NumPy.tensorBuffer2DShort(rowSet, colSet, false); + assertRowMajor((i, j) -> shortData[i][j], i -> resultRowMajor[i]); + } + + @Test + public void intTestMethod() { + RowSequence rowSet = table.getRowSet(); + ColumnSource[] colSet = getColSet(intColNames); + int[] resultColumnMajor = NumPy.tensorBuffer2DInt(rowSet, colSet, true); + assertColumnMajor((i, j) -> intData[i][j], i -> resultColumnMajor[i]); + int[] resultRowMajor = NumPy.tensorBuffer2DInt(rowSet, colSet, false); + assertRowMajor((i, j) -> intData[i][j], i -> resultRowMajor[i]); + } + + @Test + public void longTestMethod() { + RowSequence rowSet = table.getRowSet(); + ColumnSource[] colSet = getColSet(longColNames); + long[] resultColumnMajor = NumPy.tensorBuffer2DLong(rowSet, colSet, true); + assertColumnMajor((i, j) -> longData[i][j], i -> resultColumnMajor[i]); + long[] resultRowMajor = NumPy.tensorBuffer2DLong(rowSet, colSet, false); + assertRowMajor((i, j) -> longData[i][j], i -> resultRowMajor[i]); + } + + @Test + public void floatTestMethod() { + RowSequence rowSet = table.getRowSet(); + ColumnSource[] colSet = getColSet(floatColNames); + float[] resultColumnMajor = NumPy.tensorBuffer2DFloat(rowSet, colSet, true); + assertColumnMajor((i, j) -> floatData[i][j], i -> resultColumnMajor[i]); + float[] resultRowMajor = NumPy.tensorBuffer2DFloat(rowSet, colSet, false); + assertRowMajor((i, j) -> floatData[i][j], i -> resultRowMajor[i]); + } + + @Test + public void doubleTestMethod() { + RowSequence rowSet = table.getRowSet(); + ColumnSource[] colSet = getColSet(doubleColNames); + double[] resultColumnMajor = NumPy.tensorBuffer2DDouble(rowSet, colSet, true); + assertColumnMajor((i, j) -> doubleData[i][j], i -> resultColumnMajor[i]); + double[] resultRowMajor = NumPy.tensorBuffer2DDouble(rowSet, colSet, false); + assertRowMajor((i, j) -> doubleData[i][j], i -> resultRowMajor[i]); + } + +}