diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java index 9aa0c9376c5..910c1b9365b 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java @@ -25,6 +25,7 @@ import io.deephaven.engine.rowset.impl.ExternalizableRowSetUtils; import io.deephaven.engine.table.impl.util.BarrageMessage; import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator; +import io.deephaven.extensions.barrage.chunk.DefaultChunkInputStreamGeneratorFactory; import io.deephaven.extensions.barrage.chunk.SingleElementListHeaderInputStreamGenerator; import io.deephaven.extensions.barrage.util.BarrageProtoUtil.ExposedByteArrayOutputStream; import io.deephaven.extensions.barrage.util.BarrageUtil; @@ -122,9 +123,10 @@ public static class ModColumnGenerator implements SafeCloseable { private final RowSetGenerator rowsModified; private final ChunkListInputStreamGenerator data; - ModColumnGenerator(final BarrageMessage.ModColumnData col) throws IOException { + ModColumnGenerator(ChunkInputStreamGenerator.Factory factory, final BarrageMessage.ModColumnData col) + throws IOException { rowsModified = new RowSetGenerator(col.rowsModified); - data = new ChunkListInputStreamGenerator(col.type, col.componentType, col.data, col.chunkType); + data = new ChunkListInputStreamGenerator(factory, col.type, col.componentType, col.data, col.chunkType); } @Override @@ -173,13 +175,15 @@ public BarrageStreamGeneratorImpl(final BarrageMessage message, addColumnData = new ChunkListInputStreamGenerator[message.addColumnData.length]; for (int i = 0; i < message.addColumnData.length; ++i) { BarrageMessage.AddColumnData columnData = message.addColumnData[i]; - addColumnData[i] = new ChunkListInputStreamGenerator(columnData.type, columnData.componentType, + addColumnData[i] = new ChunkListInputStreamGenerator(DefaultChunkInputStreamGeneratorFactory.INSTANCE, + columnData.type, columnData.componentType, columnData.data, columnData.chunkType); } modColumnData = new ModColumnGenerator[message.modColumnData.length]; for (int i = 0; i < modColumnData.length; ++i) { - modColumnData[i] = new ModColumnGenerator(message.modColumnData[i]); + modColumnData[i] = new ModColumnGenerator(DefaultChunkInputStreamGeneratorFactory.INSTANCE, + message.modColumnData[i]); } } catch (final IOException e) { throw new UncheckedDeephavenException("unexpected IOException while creating barrage message stream", e); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ChunkListInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ChunkListInputStreamGenerator.java index f64be56149f..a5b95f2c524 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ChunkListInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ChunkListInputStreamGenerator.java @@ -19,7 +19,8 @@ public class ChunkListInputStreamGenerator implements SafeCloseable { private final List generators; private final ChunkInputStreamGenerator emptyGenerator; - public ChunkListInputStreamGenerator(Class type, Class componentType, List> data, + public ChunkListInputStreamGenerator(ChunkInputStreamGenerator.Factory factory, Class type, + Class componentType, List> data, ChunkType chunkType) { // create an input stream generator for each chunk ChunkInputStreamGenerator[] generators = new ChunkInputStreamGenerator[data.size()]; @@ -27,12 +28,12 @@ public ChunkListInputStreamGenerator(Class type, Class componentType, List long rowOffset = 0; for (int i = 0; i < data.size(); ++i) { final Chunk valuesChunk = data.get(i); - generators[i] = ChunkInputStreamGenerator.makeInputStreamGenerator(chunkType, type, componentType, + generators[i] = factory.makeInputStreamGenerator(chunkType, type, componentType, valuesChunk, rowOffset); rowOffset += valuesChunk.size(); } this.generators = Arrays.asList(generators); - emptyGenerator = ChunkInputStreamGenerator.makeInputStreamGenerator( + emptyGenerator = factory.makeInputStreamGenerator( chunkType, type, componentType, chunkType.getEmptyChunk(), 0); } diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BaseChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BaseChunkInputStreamGenerator.java index ab4c5bd20b8..5f8c65c374a 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BaseChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/BaseChunkInputStreamGenerator.java @@ -4,13 +4,13 @@ package io.deephaven.extensions.barrage.chunk; import io.deephaven.chunk.attributes.Values; +import io.deephaven.chunk.util.pools.PoolableChunk; import io.deephaven.engine.rowset.RowSequence; import io.deephaven.engine.rowset.RowSequenceFactory; import io.deephaven.engine.rowset.RowSet; import io.deephaven.extensions.barrage.util.StreamReaderOptions; import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.chunk.Chunk; -import io.deephaven.chunk.util.pools.PoolableChunk; import java.io.IOException; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java index e1f9039b47e..bfd22d342e4 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/ChunkInputStreamGenerator.java @@ -3,181 +3,47 @@ // package io.deephaven.extensions.barrage.chunk; -import com.google.common.base.Charsets; -import io.deephaven.chunk.ObjectChunk; -import io.deephaven.chunk.WritableLongChunk; import io.deephaven.chunk.attributes.Values; -import io.deephaven.chunk.util.pools.PoolableChunk; import io.deephaven.engine.rowset.RowSet; import io.deephaven.extensions.barrage.util.DefensiveDrainable; import io.deephaven.extensions.barrage.util.StreamReaderOptions; -import io.deephaven.time.DateTimeUtils; import io.deephaven.util.QueryConstants; import io.deephaven.util.datastructures.LongSizedDataStructure; import io.deephaven.chunk.Chunk; import io.deephaven.chunk.ChunkType; import io.deephaven.util.SafeCloseable; -import io.deephaven.vector.Vector; import org.jetbrains.annotations.Nullable; import java.io.IOException; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalTime; -import java.time.ZonedDateTime; public interface ChunkInputStreamGenerator extends SafeCloseable { long MS_PER_DAY = 24 * 60 * 60 * 1000L; long MIN_LOCAL_DATE_VALUE = QueryConstants.MIN_LONG / MS_PER_DAY; long MAX_LOCAL_DATE_VALUE = QueryConstants.MAX_LONG / MS_PER_DAY; - static ChunkInputStreamGenerator makeInputStreamGenerator( - final ChunkType chunkType, - final Class type, - final Class componentType, - final Chunk chunk, - final long rowOffset) { - // TODO (deephaven-core#5453): pass in ArrowType to enable ser/deser of single java class in multiple formats - switch (chunkType) { - case Boolean: - throw new UnsupportedOperationException("Booleans are reinterpreted as bytes"); - case Char: - return new CharChunkInputStreamGenerator(chunk.asCharChunk(), Character.BYTES, rowOffset); - case Byte: - if (type == Boolean.class || type == boolean.class) { - // internally we represent booleans as bytes, but the wire format respects arrow's specification - return new BooleanChunkInputStreamGenerator(chunk.asByteChunk(), rowOffset); - } - return new ByteChunkInputStreamGenerator(chunk.asByteChunk(), Byte.BYTES, rowOffset); - case Short: - return new ShortChunkInputStreamGenerator(chunk.asShortChunk(), Short.BYTES, rowOffset); - case Int: - return new IntChunkInputStreamGenerator(chunk.asIntChunk(), Integer.BYTES, rowOffset); - case Long: - return new LongChunkInputStreamGenerator(chunk.asLongChunk(), Long.BYTES, rowOffset); - case Float: - return new FloatChunkInputStreamGenerator(chunk.asFloatChunk(), Float.BYTES, rowOffset); - case Double: - return new DoubleChunkInputStreamGenerator(chunk.asDoubleChunk(), Double.BYTES, rowOffset); - case Object: - if (type.isArray()) { - if (componentType == byte.class) { - return new VarBinaryChunkInputStreamGenerator<>(chunk.asObjectChunk(), rowOffset, - (out, item) -> out.write((byte[]) item)); - } else { - return new VarListChunkInputStreamGenerator<>(type, chunk.asObjectChunk(), rowOffset); - } - } - if (Vector.class.isAssignableFrom(type)) { - // noinspection unchecked - return new VectorChunkInputStreamGenerator( - (Class>) type, componentType, chunk.asObjectChunk(), rowOffset); - } - if (type == String.class) { - return new VarBinaryChunkInputStreamGenerator(chunk.asObjectChunk(), rowOffset, - (out, str) -> out.write(str.getBytes(Charsets.UTF_8))); - } - if (type == BigInteger.class) { - return new VarBinaryChunkInputStreamGenerator(chunk.asObjectChunk(), rowOffset, - (out, item) -> out.write(item.toByteArray())); - } - if (type == BigDecimal.class) { - return new VarBinaryChunkInputStreamGenerator(chunk.asObjectChunk(), rowOffset, - (out, item) -> { - final BigDecimal normal = item.stripTrailingZeros(); - final int v = normal.scale(); - // Write as little endian, arrow endianness. - out.write(0xFF & v); - out.write(0xFF & (v >> 8)); - out.write(0xFF & (v >> 16)); - out.write(0xFF & (v >> 24)); - out.write(normal.unscaledValue().toByteArray()); - }); - } - if (type == Instant.class) { - // This code path is utilized for arrays and vectors of Instant, which cannot be reinterpreted. - ObjectChunk objChunk = chunk.asObjectChunk(); - WritableLongChunk outChunk = WritableLongChunk.makeWritableChunk(objChunk.size()); - for (int i = 0; i < objChunk.size(); ++i) { - outChunk.set(i, DateTimeUtils.epochNanos(objChunk.get(i))); - } - if (chunk instanceof PoolableChunk) { - ((PoolableChunk) chunk).close(); - } - return new LongChunkInputStreamGenerator(outChunk, Long.BYTES, rowOffset); - } - if (type == ZonedDateTime.class) { - // This code path is utilized for arrays and vectors of Instant, which cannot be reinterpreted. - ObjectChunk objChunk = chunk.asObjectChunk(); - WritableLongChunk outChunk = WritableLongChunk.makeWritableChunk(objChunk.size()); - for (int i = 0; i < objChunk.size(); ++i) { - outChunk.set(i, DateTimeUtils.epochNanos(objChunk.get(i))); - } - if (chunk instanceof PoolableChunk) { - ((PoolableChunk) chunk).close(); - } - return new LongChunkInputStreamGenerator(outChunk, Long.BYTES, rowOffset); - } - if (type == Boolean.class) { - return BooleanChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset); - } - if (type == Byte.class) { - return ByteChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset); - } - if (type == Character.class) { - return CharChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset); - } - if (type == Double.class) { - return DoubleChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset); - } - if (type == Float.class) { - return FloatChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset); - } - if (type == Integer.class) { - return IntChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset); - } - if (type == Long.class) { - return LongChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset); - } - if (type == Short.class) { - return ShortChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset); - } - if (type == LocalDate.class) { - return LongChunkInputStreamGenerator.convertWithTransform(chunk.asObjectChunk(), - rowOffset, date -> { - if (date == null) { - return QueryConstants.NULL_LONG; - } - final long epochDay = date.toEpochDay(); - if (epochDay < MIN_LOCAL_DATE_VALUE || epochDay > MAX_LOCAL_DATE_VALUE) { - throw new IllegalArgumentException("Date out of range: " + date + " (" + epochDay - + " not in [" + MIN_LOCAL_DATE_VALUE + ", " + MAX_LOCAL_DATE_VALUE + "])"); - } - return epochDay * MS_PER_DAY; - }); - } - if (type == LocalTime.class) { - return LongChunkInputStreamGenerator.convertWithTransform(chunk.asObjectChunk(), - rowOffset, time -> { - if (time == null) { - return QueryConstants.NULL_LONG; - } - final long nanoOfDay = time.toNanoOfDay(); - if (nanoOfDay < 0) { - throw new IllegalArgumentException("Time out of range: " + time); - } - return nanoOfDay; - }); - } - // TODO (core#936): support column conversion modes - - return new VarBinaryChunkInputStreamGenerator<>(chunk.asObjectChunk(), rowOffset, - (out, item) -> out.write(item.toString().getBytes(Charsets.UTF_8))); - default: - throw new UnsupportedOperationException(); - } + /** + * Creator of {@link ChunkInputStreamGenerator} instances. + *

+ * This API may not be stable, while the JS API's usages of it are implemented. + */ + interface Factory { + /** + * Returns an instance capable of writing the given chunk + * + * @param chunkType the type of the chunk to be written + * @param type the Java type of the column being written + * @param componentType the Java type of data in an array/vector, or null if irrelevant + * @param chunk the chunk that will be written out to an input stream + * @param rowOffset the offset into the chunk to start writing from + * @return an instance capable of serializing the given chunk + * @param the type of data in the column + */ + ChunkInputStreamGenerator makeInputStreamGenerator( + final ChunkType chunkType, + final Class type, + final Class componentType, + final Chunk chunk, + final long rowOffset); } /** diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkInputStreamGeneratorFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkInputStreamGeneratorFactory.java new file mode 100644 index 00000000000..8255b870fc1 --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkInputStreamGeneratorFactory.java @@ -0,0 +1,178 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.barrage.chunk; + +import com.google.common.base.Charsets; +import io.deephaven.chunk.Chunk; +import io.deephaven.chunk.ChunkType; +import io.deephaven.chunk.ObjectChunk; +import io.deephaven.chunk.WritableLongChunk; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.chunk.util.pools.PoolableChunk; +import io.deephaven.time.DateTimeUtils; +import io.deephaven.util.QueryConstants; +import io.deephaven.vector.Vector; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.ZonedDateTime; + +import static io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator.MAX_LOCAL_DATE_VALUE; +import static io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator.MIN_LOCAL_DATE_VALUE; +import static io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator.MS_PER_DAY; + +/** + * JVM implementation of ChunkInputStreamGenerator.Factory, suitable for use in Java clients and servers. + */ +public class DefaultChunkInputStreamGeneratorFactory implements ChunkInputStreamGenerator.Factory { + public static final DefaultChunkInputStreamGeneratorFactory INSTANCE = + new DefaultChunkInputStreamGeneratorFactory(); + + @Override + public ChunkInputStreamGenerator makeInputStreamGenerator(ChunkType chunkType, Class type, + Class componentType, Chunk chunk, long rowOffset) { + // TODO (deephaven-core#5453): pass in ArrowType to enable ser/deser of single java class in multiple formats + switch (chunkType) { + case Boolean: + throw new UnsupportedOperationException("Booleans are reinterpreted as bytes"); + case Char: + return new CharChunkInputStreamGenerator(chunk.asCharChunk(), Character.BYTES, rowOffset); + case Byte: + if (type == Boolean.class || type == boolean.class) { + // internally we represent booleans as bytes, but the wire format respects arrow's specification + return new BooleanChunkInputStreamGenerator(chunk.asByteChunk(), rowOffset); + } + return new ByteChunkInputStreamGenerator(chunk.asByteChunk(), Byte.BYTES, rowOffset); + case Short: + return new ShortChunkInputStreamGenerator(chunk.asShortChunk(), Short.BYTES, rowOffset); + case Int: + return new IntChunkInputStreamGenerator(chunk.asIntChunk(), Integer.BYTES, rowOffset); + case Long: + return new LongChunkInputStreamGenerator(chunk.asLongChunk(), Long.BYTES, rowOffset); + case Float: + return new FloatChunkInputStreamGenerator(chunk.asFloatChunk(), Float.BYTES, rowOffset); + case Double: + return new DoubleChunkInputStreamGenerator(chunk.asDoubleChunk(), Double.BYTES, rowOffset); + case Object: + if (type.isArray()) { + if (componentType == byte.class) { + return new VarBinaryChunkInputStreamGenerator<>(chunk.asObjectChunk(), rowOffset, + (out, item) -> out.write((byte[]) item)); + } else { + return new VarListChunkInputStreamGenerator<>(this, type, chunk.asObjectChunk(), rowOffset); + } + } + if (Vector.class.isAssignableFrom(type)) { + // noinspection unchecked + return new VectorChunkInputStreamGenerator(this, + (Class>) type, componentType, chunk.asObjectChunk(), rowOffset); + } + if (type == String.class) { + return new VarBinaryChunkInputStreamGenerator(chunk.asObjectChunk(), rowOffset, + (out, str) -> out.write(str.getBytes(Charsets.UTF_8))); + } + if (type == BigInteger.class) { + return new VarBinaryChunkInputStreamGenerator(chunk.asObjectChunk(), rowOffset, + (out, item) -> out.write(item.toByteArray())); + } + if (type == BigDecimal.class) { + return new VarBinaryChunkInputStreamGenerator(chunk.asObjectChunk(), rowOffset, + (out, item) -> { + final BigDecimal normal = item.stripTrailingZeros(); + final int v = normal.scale(); + // Write as little endian, arrow endianness. + out.write(0xFF & v); + out.write(0xFF & (v >> 8)); + out.write(0xFF & (v >> 16)); + out.write(0xFF & (v >> 24)); + out.write(normal.unscaledValue().toByteArray()); + }); + } + if (type == Instant.class) { + // This code path is utilized for arrays and vectors of Instant, which cannot be reinterpreted. + ObjectChunk objChunk = chunk.asObjectChunk(); + WritableLongChunk outChunk = WritableLongChunk.makeWritableChunk(objChunk.size()); + for (int i = 0; i < objChunk.size(); ++i) { + outChunk.set(i, DateTimeUtils.epochNanos(objChunk.get(i))); + } + if (chunk instanceof PoolableChunk) { + ((PoolableChunk) chunk).close(); + } + return new LongChunkInputStreamGenerator(outChunk, Long.BYTES, rowOffset); + } + if (type == ZonedDateTime.class) { + // This code path is utilized for arrays and vectors of Instant, which cannot be reinterpreted. + ObjectChunk objChunk = chunk.asObjectChunk(); + WritableLongChunk outChunk = WritableLongChunk.makeWritableChunk(objChunk.size()); + for (int i = 0; i < objChunk.size(); ++i) { + outChunk.set(i, DateTimeUtils.epochNanos(objChunk.get(i))); + } + if (chunk instanceof PoolableChunk) { + ((PoolableChunk) chunk).close(); + } + return new LongChunkInputStreamGenerator(outChunk, Long.BYTES, rowOffset); + } + if (type == Boolean.class) { + return BooleanChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset); + } + if (type == Byte.class) { + return ByteChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset); + } + if (type == Character.class) { + return CharChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset); + } + if (type == Double.class) { + return DoubleChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset); + } + if (type == Float.class) { + return FloatChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset); + } + if (type == Integer.class) { + return IntChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset); + } + if (type == Long.class) { + return LongChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset); + } + if (type == Short.class) { + return ShortChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset); + } + if (type == LocalDate.class) { + return LongChunkInputStreamGenerator.convertWithTransform(chunk.asObjectChunk(), + rowOffset, date -> { + if (date == null) { + return QueryConstants.NULL_LONG; + } + final long epochDay = date.toEpochDay(); + if (epochDay < MIN_LOCAL_DATE_VALUE || epochDay > MAX_LOCAL_DATE_VALUE) { + throw new IllegalArgumentException("Date out of range: " + date + " (" + epochDay + + " not in [" + MIN_LOCAL_DATE_VALUE + ", " + MAX_LOCAL_DATE_VALUE + "])"); + } + return epochDay * MS_PER_DAY; + }); + } + if (type == LocalTime.class) { + return LongChunkInputStreamGenerator.convertWithTransform(chunk.asObjectChunk(), + rowOffset, time -> { + if (time == null) { + return QueryConstants.NULL_LONG; + } + final long nanoOfDay = time.toNanoOfDay(); + if (nanoOfDay < 0) { + throw new IllegalArgumentException("Time out of range: " + time); + } + return nanoOfDay; + }); + } + // TODO (core#936): support column conversion modes + + return new VarBinaryChunkInputStreamGenerator<>(chunk.asObjectChunk(), rowOffset, + (out, item) -> out.write(item.toString().getBytes(Charsets.UTF_8))); + default: + throw new UnsupportedOperationException(); + } + } +} diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java index f2278221dc6..dc1a7895aea 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/DefaultChunkReadingFactory.java @@ -22,9 +22,9 @@ import static io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator.MS_PER_DAY; /** - * JVM implementation of ChunkReadingFactory, suitable for use in Java clients and servers. This default implementations - * may not round trip flight types correctly, but will round trip Deephaven table definitions and table data. Neither of - * these is a required/expected property of being a Flight/Barrage/Deephaven client. + * JVM implementation of {@link ChunkReader.Factory}, suitable for use in Java clients and servers. This default + * implementation may not round trip flight types in a stable way, but will round trip Deephaven table definitions and + * table data. Neither of these is a required/expected property of being a Flight/Barrage/Deephaven client. */ public final class DefaultChunkReadingFactory implements ChunkReader.Factory { public static final ChunkReader.Factory INSTANCE = new DefaultChunkReadingFactory(); diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkInputStreamGenerator.java index 2e8f90dc035..20dbea517c0 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VarListChunkInputStreamGenerator.java @@ -21,22 +21,22 @@ import io.deephaven.util.mutable.MutableInt; import org.jetbrains.annotations.Nullable; -import java.io.DataInput; import java.io.IOException; import java.io.OutputStream; -import java.util.Iterator; -import java.util.PrimitiveIterator; public class VarListChunkInputStreamGenerator extends BaseChunkInputStreamGenerator> { private static final String DEBUG_NAME = "VarListChunkInputStreamGenerator"; + private final Factory factory; private final Class type; private WritableIntChunk offsets; private ChunkInputStreamGenerator innerGenerator; - VarListChunkInputStreamGenerator(final Class type, final ObjectChunk chunk, final long rowOffset) { + VarListChunkInputStreamGenerator(ChunkInputStreamGenerator.Factory factory, final Class type, + final ObjectChunk chunk, final long rowOffset) { super(chunk, 0, rowOffset); + this.factory = factory; this.type = type; } @@ -56,8 +56,7 @@ private synchronized void computePayload() { offsets = WritableIntChunk.makeWritableChunk(chunk.size() + 1); final WritableChunk innerChunk = kernel.expand(chunk, offsets); - innerGenerator = ChunkInputStreamGenerator.makeInputStreamGenerator( - chunkType, myType, myComponentType, innerChunk, 0); + innerGenerator = factory.makeInputStreamGenerator(chunkType, myType, myComponentType, innerChunk, 0); } @Override diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkInputStreamGenerator.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkInputStreamGenerator.java index ae375ed46be..a2f9378ae4c 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkInputStreamGenerator.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/chunk/VectorChunkInputStreamGenerator.java @@ -29,16 +29,19 @@ public class VectorChunkInputStreamGenerator extends BaseChunkInputStreamGenerat private static final String DEBUG_NAME = "VarListChunkInputStreamGenerator"; private final Class componentType; + private final Factory factory; private WritableIntChunk offsets; private ChunkInputStreamGenerator innerGenerator; VectorChunkInputStreamGenerator( + final ChunkInputStreamGenerator.Factory factory, final Class> type, final Class componentType, final ObjectChunk, Values> chunk, final long rowOffset) { super(chunk, 0, rowOffset); + this.factory = factory; this.componentType = VectorExpansionKernel.getComponentType(type, componentType); } @@ -53,8 +56,7 @@ private synchronized void computePayload() { offsets = WritableIntChunk.makeWritableChunk(chunk.size() + 1); final WritableChunk innerChunk = kernel.expand(chunk, offsets); - innerGenerator = ChunkInputStreamGenerator.makeInputStreamGenerator( - chunkType, componentType, innerComponentType, innerChunk, 0); + innerGenerator = factory.makeInputStreamGenerator(chunkType, componentType, innerComponentType, innerChunk, 0); } @Override diff --git a/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/chunk/BarrageColumnRoundTripTest.java b/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/chunk/BarrageColumnRoundTripTest.java index feb5e09c969..e34550627af 100644 --- a/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/chunk/BarrageColumnRoundTripTest.java +++ b/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/chunk/BarrageColumnRoundTripTest.java @@ -681,8 +681,8 @@ private static void testRoundTripSerialization( data.copyFromChunk(srcData, 0, 0, srcData.size()); try (SafeCloseable ignored = data; - ChunkInputStreamGenerator generator = ChunkInputStreamGenerator.makeInputStreamGenerator( - chunkType, type, type.getComponentType(), srcData, 0)) { + ChunkInputStreamGenerator generator = DefaultChunkInputStreamGeneratorFactory.INSTANCE + .makeInputStreamGenerator(chunkType, type, type.getComponentType(), srcData, 0)) { // full sub logic try (final BarrageProtoUtil.ExposedByteArrayOutputStream baos = new BarrageProtoUtil.ExposedByteArrayOutputStream();