Skip to content

Commit

Permalink
Barrage/web UI support for BigDecimal and BigInteger. (#1627)
Browse files Browse the repository at this point in the history
* Adding decimal support for parquet writing; fixing nested avro; start of more general decimal support.

* Remove mistakenly added file. build.gradle.compactstringexperiment

* Fix for BigDecimal and BigInteger to the web UI.

* More comments.

* Renamed DatetimeAsLongCS to DateTime...

* Added a comment to explain computedCache.

* Slight improvement to BigDecimal codec detection.

* Followup to review comments.

* Followup to review comments.

* Fixed int32 and int64 based parquet decimal reading.

* Spotlessfy.

* Followup to review comments.
  • Loading branch information
jcferretti authored Dec 8, 2021
1 parent d8ca845 commit 6c329b3
Show file tree
Hide file tree
Showing 25 changed files with 1,027 additions and 103 deletions.
5 changes: 4 additions & 1 deletion Integrations/python/deephaven/Types.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
float64 = None
string = None
bigdecimal = None
biginteger = None
stringset = None
datetime = None
timeperiod = None
Expand Down Expand Up @@ -79,7 +80,7 @@ def _defineSymbols():
_qst_column_, _qst_newtable_, _qst_type_, _table_, \
DataType, bool_, byte, short, int16, char, int_, int32, long_, int64, \
float_, single, float32, double, float64, \
string, bigdecimal, stringset, datetime, timeperiod, \
string, bigdecimal, biginteger, stringset, datetime, timeperiod, \
byte_array, short_array, int16_array, int_array, int32_array, long_array, int64_array, \
float_array, single_array, float32_array, double_array, float64_array, string_array, \
_type2jtype
Expand Down Expand Up @@ -114,6 +115,7 @@ def _defineSymbols():
float64 = double # make life simple for people who are used to pyarrow
string = DataType(_qst_type_.stringType())
bigdecimal = _typeFromJavaClassName('java.math.BigDecimal')
biginteger = _typeFromJavaClassName('java.math.BigInteger')
stringset = _typeFromJavaClassName('io.deephaven.stringset.StringSet')
datetime = _typeFromJavaClassName('io.deephaven.time.DateTime')
timeperiod = _typeFromJavaClassName('io.deephaven.time.Period')
Expand Down Expand Up @@ -143,6 +145,7 @@ def _defineSymbols():
double : jpy.get_type('double'),
string : jpy.get_type('java.lang.String'),
bigdecimal : jpy.get_type('java.math.BigDecimal'),
biginteger : jpy.get_type('java.math.BigInteger'),
stringset : jpy.get_type('io.deephaven.stringset.StringSet'),
datetime : jpy.get_type('io.deephaven.time.DateTime'),
timeperiod : jpy.get_type('io.deephaven.time.Period'),
Expand Down
30 changes: 22 additions & 8 deletions Integrations/python/test/testParquetTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import shutil

from deephaven import TableTools, ParquetTools
import deephaven.Types as dh


if sys.version_info[0] < 3:
Expand Down Expand Up @@ -49,24 +50,18 @@ def testCreation(self):
shutil.rmtree(fileLocation)
if os.path.exists(fileLocation2):
shutil.rmtree(fileLocation2)
time.sleep(0.01) # avoid race condition on file existence...

# Writing
with self.subTest(msg="writeTable(Table, String)"):
ParquetTools.writeTable(table, fileLocation)
time.sleep(0.01) # avoid race condition on file existence...
self.assertTrue(os.path.exists(fileLocation))
shutil.rmtree(baseDir)
time.sleep(0.01) # avoid race condition on file existence...
with self.subTest(msg="writeTable(Table, File)"):
ParquetTools.writeTable(table, ParquetTools.getFileObject(fileLocation))
time.sleep(0.01) # avoid race condition on file existence...
self.assertTrue(os.path.exists(fileLocation))
shutil.rmtree(baseDir)
time.sleep(0.01) # avoid race condition on file existence...
with self.subTest(msg="writeTables(Table[], TableDefinition, File[]"):
ParquetTools.writeTables([table, table], definition, [fileLocation, fileLocation2])
time.sleep(0.01) # avoid race condition on file existence...
self.assertTrue(os.path.exists(fileLocation))
self.assertTrue(os.path.exists(fileLocation2))

Expand All @@ -78,14 +73,33 @@ def testCreation(self):
with self.subTest(msg="delete(File)"):
if os.path.exists(fileLocation):
ParquetTools.deleteTable(fileLocation)
time.sleep(0.01) # avoid race condition on file existence...
self.assertFalse(os.path.exists(fileLocation))
if os.path.exists(fileLocation2):
ParquetTools.deleteTable(fileLocation2)
time.sleep(0.01) # avoid race condition on file existence...
self.assertFalse(os.path.exists(fileLocation2))
shutil.rmtree(baseDir)

def testDecimal(self):
jbigdecimal = jpy.get_type('java.math.BigDecimal')
table = dh.table_of([[jbigdecimal.valueOf(301, 2)],
[jbigdecimal.valueOf(201,2)],
[jbigdecimal.valueOf(101,2)]],
[('decimal_value', dh.bigdecimal)])
self.assertIsNotNone(table)
baseDir = os.path.join(self.rootDir, 'testCreation')
fileLocation = os.path.join(baseDir, 'table1.parquet')
if os.path.exists(fileLocation):
shutil.rmtree(fileLocation)

ParquetTools.writeTable(table, fileLocation)
table2 = ParquetTools.readTable(fileLocation)
self.assertEquals(table.size(), table2.size())
s = TableTools.diff(table, table2, 100)
self.assertEquals('', s)

self.assertTrue(os.path.exists(fileLocation))
shutil.rmtree(baseDir)

@classmethod
def tearDownClass(cls):
# remove the junk definitions created in the tests, if they exist...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.jetbrains.annotations.Nullable;

import java.io.Externalizable;
import java.math.BigDecimal;

/**
* Utility class to concentrate {@link ObjectCodec} lookups.
Expand All @@ -37,8 +38,7 @@ public static boolean codecRequired(@NotNull final ColumnDefinition<?> columnDef
* @return Whether a codec is required
*/
public static boolean codecRequired(@NotNull final Class<?> dataType, @Nullable final Class<?> componentType) {
if (dataType.isPrimitive() || dataType == Boolean.class || dataType == DateTime.class
|| dataType == String.class || StringSet.class.isAssignableFrom(dataType)) {
if (dataType.isPrimitive() || noCodecRequired(dataType) || StringSet.class.isAssignableFrom(dataType)) {
// Primitive, basic, and special types do not require codecs
return false;
}
Expand All @@ -48,17 +48,15 @@ public static boolean codecRequired(@NotNull final Class<?> dataType, @Nullable
"Array type " + dataType + " does not match component type " + componentType);
}
// Arrays of primitives or basic types do not require codecs
return !(componentType.isPrimitive() || componentType == Boolean.class || componentType == DateTime.class
|| componentType == String.class);
return !(componentType.isPrimitive() || noCodecRequired(dataType));
}
if (Vector.class.isAssignableFrom(dataType)) {
if (componentType == null) {
throw new IllegalArgumentException("Vector type " + dataType + " requires a component type");
}
if (ObjectVector.class.isAssignableFrom(dataType)) {
// Vectors of basic types do not require codecs
return !(componentType == Boolean.class || componentType == DateTime.class
|| componentType == String.class);
return !noCodecRequired(dataType);
}
// VectorBases of primitive types do not require codecs
return false;
Expand All @@ -67,6 +65,17 @@ public static boolean codecRequired(@NotNull final Class<?> dataType, @Nullable
return true;
}

private static boolean noCodecRequired(@NotNull final Class<?> dataType) {
return dataType == Boolean.class ||
dataType == DateTime.class ||
dataType == String.class ||
// A BigDecimal column maps to a logical type of decimal, with
// appropriate precision and scale calculated from column data,
// unless the user explicitly requested something else
// via instructions.
dataType == BigDecimal.class;
}

/**
* Test whether an explicit codec has been set.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2308,7 +2308,7 @@ static ColumnSource<?> maybeTransformToPrimitive(final ColumnSource<?> columnSou
} else {
// noinspection unchecked
final ColumnSource<DateTime> columnSourceAsDateTime = (ColumnSource<DateTime>) columnSource;
return new DatetimeAsLongColumnSource(columnSourceAsDateTime);
return new DateTimeAsLongColumnSource(columnSourceAsDateTime);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
import org.jetbrains.annotations.NotNull;

/**
* Reinterpret result {@link ColumnSource} implementations that translates {@link Boolean} to {@code byte} values.
* Reinterpret result {@link ColumnSource} implementations that translates {@link DateTime} to {@code long} values.
*/
@AbstractColumnSource.IsSerializable(value = true)
public class DatetimeAsLongColumnSource extends AbstractColumnSource<Long> implements MutableColumnSourceGetDefaults.ForLong {
public class DateTimeAsLongColumnSource extends AbstractColumnSource<Long> implements MutableColumnSourceGetDefaults.ForLong {

private final ColumnSource<DateTime> alternateColumnSource;

public DatetimeAsLongColumnSource(@NotNull final ColumnSource<DateTime> alternateColumnSource) {
public DateTimeAsLongColumnSource(@NotNull final ColumnSource<DateTime> alternateColumnSource) {
super(long.class);
this.alternateColumnSource = alternateColumnSource;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public static ColumnSource<Long> dateTimeToLongSource(ColumnSource<?> source) {
return source.reinterpret(long.class);
} else {
// noinspection unchecked
return new DatetimeAsLongColumnSource((ColumnSource<DateTime>) source);
return new DateTimeAsLongColumnSource((ColumnSource<DateTime>) source);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package io.deephaven.extensions.barrage.chunk;

import com.google.common.base.Charsets;
import com.google.common.io.LittleEndianDataOutputStream;
import com.google.common.primitives.Ints;
import gnu.trove.iterator.TLongIterator;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSet;
Expand All @@ -18,6 +20,9 @@

import java.io.DataInput;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Iterator;

public interface ChunkInputStreamGenerator extends SafeCloseable {
Expand All @@ -44,12 +49,29 @@ static <T> ChunkInputStreamGenerator makeInputStreamGenerator(
case Object:
if (type.isArray()) {
return new VarListChunkInputStreamGenerator<>(type, chunk.asObjectChunk());
} else if (type == String.class) {
}
if (type == String.class) {
return new VarBinaryChunkInputStreamGenerator<>(String.class, chunk.asObjectChunk(), (out, str) -> {
out.write(str.getBytes(Charsets.UTF_8));
});
}
// TODO (core#513): BigDecimal, BigInteger
if (type == BigInteger.class) {
return new VarBinaryChunkInputStreamGenerator<>(BigInteger.class, chunk.asObjectChunk(), (out, item) -> {
out.write(item.toByteArray());
});
}
if (type == BigDecimal.class) {
return new VarBinaryChunkInputStreamGenerator<>(BigDecimal.class, chunk.asObjectChunk(), (out, item) -> {
final BigDecimal normal = item.stripTrailingZeros();
final int v = normal.scale();
// Write as little endian, arrow endianness.
out.write(0xFF & v);
out.write(0xFF & (v >> 8));
out.write(0xFF & (v >> 16));
out.write(0xFF & (v >> 24));
out.write(normal.unscaledValue().toByteArray());
});
}
// TODO (core#936): support column conversion modes

return new VarBinaryChunkInputStreamGenerator<>(type, chunk.asObjectChunk(), (out, item) -> {
Expand Down Expand Up @@ -109,15 +131,39 @@ static <T> Chunk<Values> extractChunkFromInputStream(
Double.BYTES, options,fieldNodeIter, bufferInfoIter, is);
case Object:
if (type.isArray()) {
return VarListChunkInputStreamGenerator.extractChunkFromInputStream(options, type, fieldNodeIter, bufferInfoIter, is) ;
return VarListChunkInputStreamGenerator.extractChunkFromInputStream(
options, type, fieldNodeIter, bufferInfoIter, is);
}

if (options.columnConversionMode().equals(BarrageSubscriptionOptions.ColumnConversionMode.Stringify)) {
if (type == BigInteger.class) {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
BigInteger::new
);
}
if (type == BigDecimal.class) {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
(final byte[] buf, final int offset, final int length) -> {
// read the int scale value as little endian, arrow's endianness.
final byte b1 = buf[offset];
final byte b2 = buf[offset + 1];
final byte b3 = buf[offset + 2];
final byte b4 = buf[offset + 3];
final int scale = b4 << 24 | (b3 & 0xFF) << 16 | (b2 & 0xFF) << 8 | (b1 & 0xFF);
return new BigDecimal(new BigInteger(buf, offset + 4, length - 4), scale);
}
);
}
if (type == String.class ||
options.columnConversionMode().equals(BarrageSubscriptionOptions.ColumnConversionMode.Stringify)) {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(is, fieldNodeIter, bufferInfoIter,
(buf, off, len) -> new String(buf, off, len, Charsets.UTF_8));
} else {
throw new UnsupportedOperationException("Do not yet support column conversion mode: " + options.columnConversionMode());
}
throw new UnsupportedOperationException("Do not yet support column conversion mode: " + options.columnConversionMode());
default:
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.deephaven.extensions.barrage.chunk;

import com.google.common.base.Charsets;
import com.google.common.io.LittleEndianDataOutputStream;
import gnu.trove.iterator.TLongIterator;
import io.deephaven.UncheckedDeephavenException;
Expand Down Expand Up @@ -37,15 +36,13 @@ public class VarBinaryChunkInputStreamGenerator<T> extends BaseChunkInputStreamG

private byte[] bytes;
private WritableIntChunk<ChunkPositions> offsets;
private byte[] stringBytes;
private WritableIntChunk<ChunkPositions> stringOffsets;

public interface Appender<T> {
void append(OutputStream out, T item) throws IOException;
}

public interface Mapper<T> {
T constructFrom(final byte[] buf, int offset, int length) throws IOException;
T constructFrom(byte[] buf, int offset, int length) throws IOException;
}

VarBinaryChunkInputStreamGenerator(final Class<T> type, final ObjectChunk<T, Values> chunk,
Expand Down Expand Up @@ -76,25 +73,6 @@ private synchronized void computePayload() throws IOException {
}
}

private synchronized void computeStringPayload() throws IOException {
if (stringBytes != null) {
return;
}

stringOffsets = WritableIntChunk.makeWritableChunk(chunk.size() + 1);

try (final BarrageProtoUtil.ExposedByteArrayOutputStream baos = new BarrageProtoUtil.ExposedByteArrayOutputStream()) {
stringOffsets.set(0, 0);
for (int i = 0; i < chunk.size(); ++i) {
if (chunk.get(i) != null) {
baos.write(chunk.get(i).toString().getBytes(Charsets.UTF_8));
}
stringOffsets.set(i + 1, baos.size());
}
stringBytes = baos.peekBuffer();
}
}

@Override
public void close() {
if (REFERENCE_COUNT_UPDATER.decrementAndGet(this) == 0) {
Expand All @@ -104,21 +82,13 @@ public void close() {
if (offsets != null) {
offsets.close();
}
if (stringOffsets != null) {
stringOffsets.close();
}
}
}

@Override
public DrainableColumn getInputStream(final BarrageSubscriptionOptions options, final @Nullable RowSet subset) throws IOException {
if (type == String.class) {
computePayload();
return new ObjectChunkInputStream(options, offsets, bytes, subset);
}

computeStringPayload();
return new ObjectChunkInputStream(options, stringOffsets, stringBytes, subset);
computePayload();
return new ObjectChunkInputStream(options, offsets, bytes, subset);
}

private class ObjectChunkInputStream extends BaseChunkInputStream {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private static void pushColumnTypesFromAvroField(
}
final Schema.Type fieldType = fieldSchema.getType();
pushColumnTypesFromAvroField(
columnsOut, mappedOut, prefix, field, fieldName, fieldSchema, mappedName, fieldType,
columnsOut, mappedOut, prefix, fieldName, fieldSchema, mappedName, fieldType,
fieldNameToColumnName);

}
Expand All @@ -172,7 +172,6 @@ private static void pushColumnTypesFromAvroField(
final List<ColumnDefinition<?>> columnsOut,
final Map<String, String> mappedOut,
final String prefix,
final Schema.Field field,
final String fieldName,
final Schema fieldSchema,
final String mappedName,
Expand Down Expand Up @@ -207,13 +206,13 @@ private static void pushColumnTypesFromAvroField(
case UNION:
final Schema effectiveSchema = Utils.getEffectiveSchema(fieldName, fieldSchema);
pushColumnTypesFromAvroField(
columnsOut, mappedOut, prefix, field, fieldName, effectiveSchema, mappedName,
columnsOut, mappedOut, prefix, fieldName, effectiveSchema, mappedName,
effectiveSchema.getType(),
fieldNameToColumnName);
return;
case RECORD:
// Linearize any nesting.
for (final Schema.Field nestedField : field.schema().getFields()) {
for (final Schema.Field nestedField : fieldSchema.getFields()) {
pushColumnTypesFromAvroField(
columnsOut, mappedOut,
prefix + fieldName + NESTED_FIELD_NAME_SEPARATOR,
Expand Down
Loading

0 comments on commit 6c329b3

Please sign in to comment.