Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: Extract factory interface for ChunkInputStreamGenerators #5811

Merged
merged 5 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.deephaven.engine.rowset.impl.ExternalizableRowSetUtils;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator;
import io.deephaven.extensions.barrage.chunk.DefaultChunkInputStreamGeneratorFactory;
import io.deephaven.extensions.barrage.chunk.SingleElementListHeaderInputStreamGenerator;
import io.deephaven.extensions.barrage.util.BarrageProtoUtil.ExposedByteArrayOutputStream;
import io.deephaven.extensions.barrage.util.BarrageUtil;
Expand Down Expand Up @@ -122,9 +123,10 @@ public static class ModColumnGenerator implements SafeCloseable {
private final RowSetGenerator rowsModified;
private final ChunkListInputStreamGenerator data;

ModColumnGenerator(final BarrageMessage.ModColumnData col) throws IOException {
ModColumnGenerator(ChunkInputStreamGenerator.Factory factory, final BarrageMessage.ModColumnData col)
throws IOException {
rowsModified = new RowSetGenerator(col.rowsModified);
data = new ChunkListInputStreamGenerator(col.type, col.componentType, col.data, col.chunkType);
data = new ChunkListInputStreamGenerator(factory, col.type, col.componentType, col.data, col.chunkType);
}

@Override
Expand Down Expand Up @@ -173,13 +175,15 @@ public BarrageStreamGeneratorImpl(final BarrageMessage message,
addColumnData = new ChunkListInputStreamGenerator[message.addColumnData.length];
for (int i = 0; i < message.addColumnData.length; ++i) {
BarrageMessage.AddColumnData columnData = message.addColumnData[i];
addColumnData[i] = new ChunkListInputStreamGenerator(columnData.type, columnData.componentType,
addColumnData[i] = new ChunkListInputStreamGenerator(DefaultChunkInputStreamGeneratorFactory.INSTANCE,
columnData.type, columnData.componentType,
columnData.data, columnData.chunkType);
}

modColumnData = new ModColumnGenerator[message.modColumnData.length];
for (int i = 0; i < modColumnData.length; ++i) {
modColumnData[i] = new ModColumnGenerator(message.modColumnData[i]);
modColumnData[i] = new ModColumnGenerator(DefaultChunkInputStreamGeneratorFactory.INSTANCE,
message.modColumnData[i]);
}
} catch (final IOException e) {
throw new UncheckedDeephavenException("unexpected IOException while creating barrage message stream", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,21 @@ public class ChunkListInputStreamGenerator implements SafeCloseable {
private final List<ChunkInputStreamGenerator> generators;
private final ChunkInputStreamGenerator emptyGenerator;

public ChunkListInputStreamGenerator(Class<?> type, Class<?> componentType, List<Chunk<Values>> data,
public ChunkListInputStreamGenerator(ChunkInputStreamGenerator.Factory factory, Class<?> type,
Class<?> componentType, List<Chunk<Values>> data,
ChunkType chunkType) {
// create an input stream generator for each chunk
ChunkInputStreamGenerator[] generators = new ChunkInputStreamGenerator[data.size()];

long rowOffset = 0;
for (int i = 0; i < data.size(); ++i) {
final Chunk<Values> valuesChunk = data.get(i);
generators[i] = ChunkInputStreamGenerator.makeInputStreamGenerator(chunkType, type, componentType,
generators[i] = factory.makeInputStreamGenerator(chunkType, type, componentType,
valuesChunk, rowOffset);
rowOffset += valuesChunk.size();
}
this.generators = Arrays.asList(generators);
emptyGenerator = ChunkInputStreamGenerator.makeInputStreamGenerator(
emptyGenerator = factory.makeInputStreamGenerator(
chunkType, type, componentType, chunkType.getEmptyChunk(), 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSequenceFactory;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.util.pools.PoolableChunk;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,181 +3,47 @@
//
package io.deephaven.extensions.barrage.chunk;

import com.google.common.base.Charsets;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.extensions.barrage.util.DefensiveDrainable;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.ChunkType;
import io.deephaven.util.SafeCloseable;
import io.deephaven.vector.Vector;
import org.jetbrains.annotations.Nullable;

import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZonedDateTime;

public interface ChunkInputStreamGenerator extends SafeCloseable {
long MS_PER_DAY = 24 * 60 * 60 * 1000L;
long MIN_LOCAL_DATE_VALUE = QueryConstants.MIN_LONG / MS_PER_DAY;
long MAX_LOCAL_DATE_VALUE = QueryConstants.MAX_LONG / MS_PER_DAY;

static <T> ChunkInputStreamGenerator makeInputStreamGenerator(
final ChunkType chunkType,
final Class<T> type,
final Class<?> componentType,
final Chunk<Values> chunk,
final long rowOffset) {
// TODO (deephaven-core#5453): pass in ArrowType to enable ser/deser of single java class in multiple formats
switch (chunkType) {
case Boolean:
throw new UnsupportedOperationException("Booleans are reinterpreted as bytes");
case Char:
return new CharChunkInputStreamGenerator(chunk.asCharChunk(), Character.BYTES, rowOffset);
case Byte:
if (type == Boolean.class || type == boolean.class) {
// internally we represent booleans as bytes, but the wire format respects arrow's specification
return new BooleanChunkInputStreamGenerator(chunk.asByteChunk(), rowOffset);
}
return new ByteChunkInputStreamGenerator(chunk.asByteChunk(), Byte.BYTES, rowOffset);
case Short:
return new ShortChunkInputStreamGenerator(chunk.asShortChunk(), Short.BYTES, rowOffset);
case Int:
return new IntChunkInputStreamGenerator(chunk.asIntChunk(), Integer.BYTES, rowOffset);
case Long:
return new LongChunkInputStreamGenerator(chunk.asLongChunk(), Long.BYTES, rowOffset);
case Float:
return new FloatChunkInputStreamGenerator(chunk.asFloatChunk(), Float.BYTES, rowOffset);
case Double:
return new DoubleChunkInputStreamGenerator(chunk.asDoubleChunk(), Double.BYTES, rowOffset);
case Object:
if (type.isArray()) {
if (componentType == byte.class) {
return new VarBinaryChunkInputStreamGenerator<>(chunk.asObjectChunk(), rowOffset,
(out, item) -> out.write((byte[]) item));
} else {
return new VarListChunkInputStreamGenerator<>(type, chunk.asObjectChunk(), rowOffset);
}
}
if (Vector.class.isAssignableFrom(type)) {
// noinspection unchecked
return new VectorChunkInputStreamGenerator(
(Class<Vector<?>>) type, componentType, chunk.asObjectChunk(), rowOffset);
}
if (type == String.class) {
return new VarBinaryChunkInputStreamGenerator<String>(chunk.asObjectChunk(), rowOffset,
(out, str) -> out.write(str.getBytes(Charsets.UTF_8)));
}
if (type == BigInteger.class) {
return new VarBinaryChunkInputStreamGenerator<BigInteger>(chunk.asObjectChunk(), rowOffset,
(out, item) -> out.write(item.toByteArray()));
}
if (type == BigDecimal.class) {
return new VarBinaryChunkInputStreamGenerator<BigDecimal>(chunk.asObjectChunk(), rowOffset,
(out, item) -> {
final BigDecimal normal = item.stripTrailingZeros();
final int v = normal.scale();
// Write as little endian, arrow endianness.
out.write(0xFF & v);
out.write(0xFF & (v >> 8));
out.write(0xFF & (v >> 16));
out.write(0xFF & (v >> 24));
out.write(normal.unscaledValue().toByteArray());
});
}
if (type == Instant.class) {
// This code path is utilized for arrays and vectors of Instant, which cannot be reinterpreted.
ObjectChunk<Instant, Values> objChunk = chunk.asObjectChunk();
WritableLongChunk<Values> outChunk = WritableLongChunk.makeWritableChunk(objChunk.size());
for (int i = 0; i < objChunk.size(); ++i) {
outChunk.set(i, DateTimeUtils.epochNanos(objChunk.get(i)));
}
if (chunk instanceof PoolableChunk) {
((PoolableChunk) chunk).close();
}
return new LongChunkInputStreamGenerator(outChunk, Long.BYTES, rowOffset);
}
if (type == ZonedDateTime.class) {
// This code path is utilized for arrays and vectors of Instant, which cannot be reinterpreted.
ObjectChunk<ZonedDateTime, Values> objChunk = chunk.asObjectChunk();
WritableLongChunk<Values> outChunk = WritableLongChunk.makeWritableChunk(objChunk.size());
for (int i = 0; i < objChunk.size(); ++i) {
outChunk.set(i, DateTimeUtils.epochNanos(objChunk.get(i)));
}
if (chunk instanceof PoolableChunk) {
((PoolableChunk) chunk).close();
}
return new LongChunkInputStreamGenerator(outChunk, Long.BYTES, rowOffset);
}
if (type == Boolean.class) {
return BooleanChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Byte.class) {
return ByteChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Character.class) {
return CharChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Double.class) {
return DoubleChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Float.class) {
return FloatChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Integer.class) {
return IntChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Long.class) {
return LongChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Short.class) {
return ShortChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == LocalDate.class) {
return LongChunkInputStreamGenerator.<LocalDate>convertWithTransform(chunk.asObjectChunk(),
rowOffset, date -> {
if (date == null) {
return QueryConstants.NULL_LONG;
}
final long epochDay = date.toEpochDay();
if (epochDay < MIN_LOCAL_DATE_VALUE || epochDay > MAX_LOCAL_DATE_VALUE) {
throw new IllegalArgumentException("Date out of range: " + date + " (" + epochDay
+ " not in [" + MIN_LOCAL_DATE_VALUE + ", " + MAX_LOCAL_DATE_VALUE + "])");
}
return epochDay * MS_PER_DAY;
});
}
if (type == LocalTime.class) {
return LongChunkInputStreamGenerator.<LocalTime>convertWithTransform(chunk.asObjectChunk(),
rowOffset, time -> {
if (time == null) {
return QueryConstants.NULL_LONG;
}
final long nanoOfDay = time.toNanoOfDay();
if (nanoOfDay < 0) {
throw new IllegalArgumentException("Time out of range: " + time);
}
return nanoOfDay;
});
}
// TODO (core#936): support column conversion modes

return new VarBinaryChunkInputStreamGenerator<>(chunk.asObjectChunk(), rowOffset,
(out, item) -> out.write(item.toString().getBytes(Charsets.UTF_8)));
default:
throw new UnsupportedOperationException();
}
/**
* Creator of {@link ChunkInputStreamGenerator} instances.
* <p>
* This API may not be stable, while the JS API's usages of it are implemented.
*/
interface Factory {
/**
* Returns an instance capable of writing the given chunk
*
* @param chunkType the type of the chunk to be written
* @param type the Java type of the column being written
* @param componentType the Java type of data in an array/vector, or null if irrelevant
* @param chunk the chunk that will be written out to an input stream
* @param rowOffset the offset into the chunk to start writing from
* @return an instance capable of serializing the given chunk
* @param <T> the type of data in the column
*/
<T> ChunkInputStreamGenerator makeInputStreamGenerator(
final ChunkType chunkType,
final Class<T> type,
final Class<?> componentType,
final Chunk<Values> chunk,
Comment on lines +42 to +45
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we read the chunk type from the chunk?

Future follow-up: use the ChunkReader.TypeInfo here instead of type/componentType/chunkType (and possible future need for arrow Field)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TypeInfo is good if it has the arrow field, too. If Chunk has a getChunkType.. oops; feel free to use it instead.

final long rowOffset);
}

/**
Expand Down
Loading
Loading