Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Barrage: apply redirection mapping is broken into disjoint groups #2942

Merged
merged 9 commits into from
Oct 11, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,25 @@
*/
public class SizeException extends UncheckedDeephavenException {

private final long maximumSize;

/**
* Construct an exception, with a message appropriate for the given arguments.
*
*
* @param messagePrefix An optional prefix for the message
* @param inputSize The input size for the message
* @param maximumSize The maximum size for the message
*/
public SizeException(@Nullable final String messagePrefix, final long inputSize, final long maximumSize) {
super((messagePrefix == null ? "" : messagePrefix + ": ") + "Input size " + inputSize + " larger than maximum "
+ maximumSize);
this.maximumSize = maximumSize;
}

/**
* Construct an exception, with a message appropriate for the given arguments. Maximum size is assumed to be
* {@link Integer#MAX_VALUE}.
*
*
* @param messagePrefix An optional prefix for the message
* @param inputSize The input size for the message
*/
Expand All @@ -40,11 +43,20 @@ public SizeException(@Nullable final String messagePrefix, final long inputSize)
/**
* Construct an exception, with a message appropriate for the given arguments. Maximum size is assumed to be
* {@link Integer#MAX_VALUE}, and no prefix is included.
*
*
* @param inputSize The input size for the message
*/
@SuppressWarnings("unused")
public SizeException(final long inputSize) {
this(null, inputSize, Integer.MAX_VALUE);
}

/**
* Get the maximum size that was exceeded.
*
* @return The maximum size
*/
public long getMaximumSize() {
return maximumSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package io.deephaven.engine.table.impl.by.ssmcountdistinct.distinct;

import io.deephaven.engine.table.impl.sources.BoxedColumnSource;
import io.deephaven.time.DateTime;
import io.deephaven.engine.table.impl.by.ssmcountdistinct.DateTimeSsmSourceWrapper;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package io.deephaven.engine.table.impl.by.ssmcountdistinct.distinct;

import io.deephaven.engine.table.impl.sources.BoxedColumnSource;
import io.deephaven.time.DateTime;
import io.deephaven.engine.table.impl.by.ssmcountdistinct.DateTimeSsmSourceWrapper;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import io.deephaven.engine.table.impl.sources.BoxedColumnSource;
import io.deephaven.time.DateTime;
import io.deephaven.engine.table.impl.by.ssmcountdistinct.DateTimeSsmSourceWrapper;

import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.rowset.RowSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import io.deephaven.engine.table.impl.sources.BoxedColumnSource;
import io.deephaven.time.DateTime;
import io.deephaven.engine.table.impl.by.ssmcountdistinct.DateTimeSsmSourceWrapper;

import io.deephaven.engine.rowset.WritableRowSet;
import io.deephaven.engine.rowset.RowSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
*/
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSequenceFactory;
Expand All @@ -28,13 +27,25 @@ public abstract class BaseChunkInputStreamGenerator<T extends Chunk<Values>> imp
protected static final AtomicIntegerFieldUpdater<BaseChunkInputStreamGenerator> REFERENCE_COUNT_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(BaseChunkInputStreamGenerator.class, "refCount");

protected final T chunk;
protected final int elementSize;

protected final T chunk;
private final long rowOffset;

BaseChunkInputStreamGenerator(final T chunk, final int elementSize) {
BaseChunkInputStreamGenerator(final T chunk, final int elementSize, final long rowOffset) {
this.chunk = chunk;
this.elementSize = elementSize;
this.rowOffset = rowOffset;
}

@Override
public long getRowOffset() {
return rowOffset;
}

@Override
public long getLastRowOffset() {
return rowOffset + chunk.size() - 1;
}

@Override
Expand Down Expand Up @@ -65,7 +76,11 @@ abstract class BaseChunkInputStream extends DrainableColumn {
this.options = options;
this.subset = chunk.size() == 0 ? RowSequenceFactory.EMPTY : subset != null ? subset.copy() : RowSequenceFactory.forRange(0, chunk.size() - 1);
REFERENCE_COUNT_UPDATER.incrementAndGet(BaseChunkInputStreamGenerator.this);
Assert.leq(this.subset.lastRowKey(), "this.subset.lastRowKey()", Integer.MAX_VALUE, "Integer.MAX_VALUE");
// ignore the empty chunk as these are intentionally empty generators that should work for any subset
if (chunk.size() > 0 && this.subset.lastRowKey() >= chunk.size()) {
throw new IllegalStateException(
"Subset " + this.subset + " is out of bounds for chunk of size " + chunk.size());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
public class ByteChunkInputStreamGenerator extends BaseChunkInputStreamGenerator<ByteChunk<Values>> {
private static final String DEBUG_NAME = "ByteChunkInputStreamGenerator";

public static ByteChunkInputStreamGenerator convertBoxed(final ObjectChunk<Byte, Values> inChunk) {
public static ByteChunkInputStreamGenerator convertBoxed(
final ObjectChunk<Byte, Values> inChunk, final long rowOffset) {
// This code path is utilized for arrays and vectors of DateTimes, which cannot be reinterpreted.
WritableByteChunk<Values> outChunk = WritableByteChunk.makeWritableChunk(inChunk.size());
for (int i = 0; i < inChunk.size(); ++i) {
Expand All @@ -44,11 +45,11 @@ public static ByteChunkInputStreamGenerator convertBoxed(final ObjectChunk<Byte,
if (inChunk instanceof PoolableChunk) {
((PoolableChunk) inChunk).close();
}
return new ByteChunkInputStreamGenerator(outChunk, Byte.BYTES);
return new ByteChunkInputStreamGenerator(outChunk, Byte.BYTES, rowOffset);
}

ByteChunkInputStreamGenerator(final ByteChunk<Values> chunk, final int elementSize) {
super(chunk, elementSize);
ByteChunkInputStreamGenerator(final ByteChunk<Values> chunk, final int elementSize, final long rowOffset) {
super(chunk, elementSize, rowOffset);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
public class CharChunkInputStreamGenerator extends BaseChunkInputStreamGenerator<CharChunk<Values>> {
private static final String DEBUG_NAME = "CharChunkInputStreamGenerator";

public static CharChunkInputStreamGenerator convertBoxed(final ObjectChunk<Character, Values> inChunk) {
public static CharChunkInputStreamGenerator convertBoxed(
final ObjectChunk<Character, Values> inChunk, final long rowOffset) {
// This code path is utilized for arrays and vectors of DateTimes, which cannot be reinterpreted.
WritableCharChunk<Values> outChunk = WritableCharChunk.makeWritableChunk(inChunk.size());
for (int i = 0; i < inChunk.size(); ++i) {
Expand All @@ -39,11 +40,11 @@ public static CharChunkInputStreamGenerator convertBoxed(final ObjectChunk<Chara
if (inChunk instanceof PoolableChunk) {
((PoolableChunk) inChunk).close();
}
return new CharChunkInputStreamGenerator(outChunk, Character.BYTES);
return new CharChunkInputStreamGenerator(outChunk, Character.BYTES, rowOffset);
}

CharChunkInputStreamGenerator(final CharChunk<Values> chunk, final int elementSize) {
super(chunk, elementSize);
CharChunkInputStreamGenerator(final CharChunk<Values> chunk, final int elementSize, final long rowOffset) {
super(chunk, elementSize, rowOffset);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,53 +33,57 @@
public interface ChunkInputStreamGenerator extends SafeCloseable {

static <T> ChunkInputStreamGenerator makeInputStreamGenerator(
final ChunkType chunkType, final Class<T> type, final Class<?> componentType, final Chunk<Values> chunk) {
final ChunkType chunkType,
final Class<T> type,
final Class<?> componentType,
final Chunk<Values> chunk,
final long rowOffset) {
switch (chunkType) {
case Boolean:
throw new UnsupportedOperationException("Booleans are reinterpreted as bytes");
case Char:
return new CharChunkInputStreamGenerator(chunk.asCharChunk(), Character.BYTES);
return new CharChunkInputStreamGenerator(chunk.asCharChunk(), Character.BYTES, rowOffset);
case Byte:
return new ByteChunkInputStreamGenerator(chunk.asByteChunk(), Byte.BYTES);
return new ByteChunkInputStreamGenerator(chunk.asByteChunk(), Byte.BYTES, rowOffset);
case Short:
return new ShortChunkInputStreamGenerator(chunk.asShortChunk(), Short.BYTES);
return new ShortChunkInputStreamGenerator(chunk.asShortChunk(), Short.BYTES, rowOffset);
case Int:
return new IntChunkInputStreamGenerator(chunk.asIntChunk(), Integer.BYTES);
return new IntChunkInputStreamGenerator(chunk.asIntChunk(), Integer.BYTES, rowOffset);
case Long:
return new LongChunkInputStreamGenerator(chunk.asLongChunk(), Long.BYTES);
return new LongChunkInputStreamGenerator(chunk.asLongChunk(), Long.BYTES, rowOffset);
case Float:
return new FloatChunkInputStreamGenerator(chunk.asFloatChunk(), Float.BYTES);
return new FloatChunkInputStreamGenerator(chunk.asFloatChunk(), Float.BYTES, rowOffset);
case Double:
return new DoubleChunkInputStreamGenerator(chunk.asDoubleChunk(), Double.BYTES);
return new DoubleChunkInputStreamGenerator(chunk.asDoubleChunk(), Double.BYTES, rowOffset);
case Object:
if (type.isArray()) {
return new VarListChunkInputStreamGenerator<>(type, chunk.asObjectChunk());
return new VarListChunkInputStreamGenerator<>(type, chunk.asObjectChunk(), rowOffset);
}
if (Vector.class.isAssignableFrom(type)) {
//noinspection unchecked
return new VectorChunkInputStreamGenerator((Class<Vector<?>>) type, componentType, chunk.asObjectChunk());
return new VectorChunkInputStreamGenerator(
(Class<Vector<?>>) type, componentType, chunk.asObjectChunk(), rowOffset);
}
if (type == String.class) {
return new VarBinaryChunkInputStreamGenerator<String>(chunk.asObjectChunk(), (out, str) -> {
out.write(str.getBytes(Charsets.UTF_8));
});
return new VarBinaryChunkInputStreamGenerator<String>(chunk.asObjectChunk(), rowOffset,
(out, str) -> out.write(str.getBytes(Charsets.UTF_8)));
}
if (type == BigInteger.class) {
return new VarBinaryChunkInputStreamGenerator<BigInteger>(chunk.asObjectChunk(), (out, item) -> {
out.write(item.toByteArray());
});
return new VarBinaryChunkInputStreamGenerator<BigInteger>(chunk.asObjectChunk(), rowOffset,
(out, item) -> out.write(item.toByteArray()));
}
if (type == BigDecimal.class) {
return new VarBinaryChunkInputStreamGenerator<BigDecimal>(chunk.asObjectChunk(), (out, item) -> {
final BigDecimal normal = item.stripTrailingZeros();
final int v = normal.scale();
// Write as little endian, arrow endianness.
out.write(0xFF & v);
out.write(0xFF & (v >> 8));
out.write(0xFF & (v >> 16));
out.write(0xFF & (v >> 24));
out.write(normal.unscaledValue().toByteArray());
});
return new VarBinaryChunkInputStreamGenerator<BigDecimal>(chunk.asObjectChunk(), rowOffset,
(out, item) -> {
final BigDecimal normal = item.stripTrailingZeros();
final int v = normal.scale();
// Write as little endian, arrow endianness.
out.write(0xFF & v);
out.write(0xFF & (v >> 8));
out.write(0xFF & (v >> 16));
out.write(0xFF & (v >> 24));
out.write(normal.unscaledValue().toByteArray());
});
}
if (type == DateTime.class) {
// This code path is utilized for arrays and vectors of DateTimes, which cannot be reinterpreted.
Expand All @@ -91,34 +95,33 @@ static <T> ChunkInputStreamGenerator makeInputStreamGenerator(
if (chunk instanceof PoolableChunk) {
((PoolableChunk) chunk).close();
}
return new LongChunkInputStreamGenerator(outChunk, Long.BYTES);
return new LongChunkInputStreamGenerator(outChunk, Long.BYTES, rowOffset);
}
if (type == Byte.class) {
return ByteChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk());
return ByteChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Character.class) {
return CharChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk());
return CharChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Double.class) {
return DoubleChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk());
return DoubleChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Float.class) {
return FloatChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk());
return FloatChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Integer.class) {
return IntChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk());
return IntChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Long.class) {
return LongChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk());
return LongChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
if (type == Short.class) {
return ShortChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk());
return ShortChunkInputStreamGenerator.convertBoxed(chunk.asObjectChunk(), rowOffset);
}
// TODO (core#936): support column conversion modes

return new VarBinaryChunkInputStreamGenerator<>(chunk.asObjectChunk(), (out, item) -> {
out.write(item.toString().getBytes(Charsets.UTF_8));
});
return new VarBinaryChunkInputStreamGenerator<>(chunk.asObjectChunk(), rowOffset,
(out, item) -> out.write(item.toString().getBytes(Charsets.UTF_8)));
default:
throw new UnsupportedOperationException();
}
Expand Down Expand Up @@ -270,6 +273,16 @@ static WritableChunk<Values> extractChunkFromInputStream(
}
}

/**
* Returns the number of rows that were sent before the first row in this generator.
*/
long getRowOffset();

/**
* Returns the offset of the final row this generator can produce.
*/
long getLastRowOffset();

/**
* Get an input stream optionally position-space filtered using the provided RowSet.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
public class DoubleChunkInputStreamGenerator extends BaseChunkInputStreamGenerator<DoubleChunk<Values>> {
private static final String DEBUG_NAME = "DoubleChunkInputStreamGenerator";

public static DoubleChunkInputStreamGenerator convertBoxed(final ObjectChunk<Double, Values> inChunk) {
public static DoubleChunkInputStreamGenerator convertBoxed(
final ObjectChunk<Double, Values> inChunk, final long rowOffset) {
// This code path is utilized for arrays and vectors of DateTimes, which cannot be reinterpreted.
WritableDoubleChunk<Values> outChunk = WritableDoubleChunk.makeWritableChunk(inChunk.size());
for (int i = 0; i < inChunk.size(); ++i) {
Expand All @@ -44,11 +45,11 @@ public static DoubleChunkInputStreamGenerator convertBoxed(final ObjectChunk<Dou
if (inChunk instanceof PoolableChunk) {
((PoolableChunk) inChunk).close();
}
return new DoubleChunkInputStreamGenerator(outChunk, Double.BYTES);
return new DoubleChunkInputStreamGenerator(outChunk, Double.BYTES, rowOffset);
}

DoubleChunkInputStreamGenerator(final DoubleChunk<Values> chunk, final int elementSize) {
super(chunk, elementSize);
DoubleChunkInputStreamGenerator(final DoubleChunk<Values> chunk, final int elementSize, final long rowOffset) {
super(chunk, elementSize, rowOffset);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
public class FloatChunkInputStreamGenerator extends BaseChunkInputStreamGenerator<FloatChunk<Values>> {
private static final String DEBUG_NAME = "FloatChunkInputStreamGenerator";

public static FloatChunkInputStreamGenerator convertBoxed(final ObjectChunk<Float, Values> inChunk) {
public static FloatChunkInputStreamGenerator convertBoxed(
final ObjectChunk<Float, Values> inChunk, final long rowOffset) {
// This code path is utilized for arrays and vectors of DateTimes, which cannot be reinterpreted.
WritableFloatChunk<Values> outChunk = WritableFloatChunk.makeWritableChunk(inChunk.size());
for (int i = 0; i < inChunk.size(); ++i) {
Expand All @@ -44,11 +45,11 @@ public static FloatChunkInputStreamGenerator convertBoxed(final ObjectChunk<Floa
if (inChunk instanceof PoolableChunk) {
((PoolableChunk) inChunk).close();
}
return new FloatChunkInputStreamGenerator(outChunk, Float.BYTES);
return new FloatChunkInputStreamGenerator(outChunk, Float.BYTES, rowOffset);
}

FloatChunkInputStreamGenerator(final FloatChunk<Values> chunk, final int elementSize) {
super(chunk, elementSize);
FloatChunkInputStreamGenerator(final FloatChunk<Values> chunk, final int elementSize, final long rowOffset) {
super(chunk, elementSize, rowOffset);
}

@Override
Expand Down
Loading