Skip to content

Commit

Permalink
Refactor to fix stringify come first. Add explicit comment that strin…
Browse files Browse the repository at this point in the history
…gify should come last. Extract enums for ChunkReaders that don't need to be dynamic
  • Loading branch information
devinrsmith committed Oct 24, 2024
1 parent 6e3395c commit 4e48079
Showing 1 changed file with 124 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package io.deephaven.extensions.barrage.chunk;

import com.google.common.base.Charsets;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.extensions.barrage.ColumnConversionMode;
import io.deephaven.extensions.barrage.util.ArrowIpcUtil;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
Expand All @@ -13,13 +15,17 @@
import io.deephaven.vector.Vector;
import org.apache.arrow.vector.types.pojo.Schema;

import java.io.DataInput;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Iterator;
import java.util.PrimitiveIterator;

import static io.deephaven.extensions.barrage.chunk.ChunkInputStreamGenerator.MS_PER_DAY;

Expand Down Expand Up @@ -62,13 +68,7 @@ public ChunkReader getReader(StreamReaderOptions options, int factor,
case Object:
if (typeInfo.type().isArray()) {
if (typeInfo.componentType() == byte.class) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset,
totalRows) -> VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
(buf, off, len) -> Arrays.copyOfRange(buf, off, off + len),
outChunk, outOffset, totalRows);
return ByteArrayChunkReader.BYTEARRAY_READER;
} else {
return new VarListChunkReader<>(options, typeInfo, this);
}
Expand All @@ -77,30 +77,10 @@ public ChunkReader getReader(StreamReaderOptions options, int factor,
return new VectorChunkReader(options, typeInfo, this);
}
if (typeInfo.type() == BigInteger.class) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset,
totalRows) -> VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
BigInteger::new,
outChunk, outOffset, totalRows);
return BigIntegerChunkReader.BIG_INTEGER_CHUNK_READER;
}
if (typeInfo.type() == BigDecimal.class) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset,
totalRows) -> VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
(final byte[] buf, final int offset, final int length) -> {
// read the int scale value as little endian, arrow's endianness.
final byte b1 = buf[offset];
final byte b2 = buf[offset + 1];
final byte b3 = buf[offset + 2];
final byte b4 = buf[offset + 3];
final int scale = b4 << 24 | (b3 & 0xFF) << 16 | (b2 & 0xFF) << 8 | (b1 & 0xFF);
return new BigDecimal(new BigInteger(buf, offset + 4, length - 4), scale);
},
outChunk, outOffset, totalRows);
return BigDecimalChunkReader.BIG_DECIMAL_CHUNK_READER;
}
if (typeInfo.type() == Instant.class) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset,
Expand Down Expand Up @@ -186,22 +166,17 @@ public ChunkReader getReader(StreamReaderOptions options, int factor,
return new LongChunkReader(options).transform(
value -> value == QueryConstants.NULL_LONG ? null : LocalTime.ofNanoOfDay(value));
}
if (typeInfo.type() == String.class ||
options.columnConversionMode().equals(ColumnConversionMode.Stringify)) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset,
totalRows) -> VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(is,
fieldNodeIter,
bufferInfoIter,
(buf, off, len) -> new String(buf, off, len, Charsets.UTF_8), outChunk, outOffset,
totalRows);
if (typeInfo.type() == String.class) {
return StringChunkReader.STRING_CHUNK_READER;
}
// TODO (core#58): add custom barrage serialization/deserialization support
// // Migrate Schema to custom format when available.
if (typeInfo.type() == Schema.class) {
return (fieldNodeIter, bufferInfoIter, is, outChunk, outOffset,
totalRows) -> VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(is,
fieldNodeIter, bufferInfoIter, ArrowIpcUtil::deserialize, outChunk, outOffset,
totalRows);
return SchemaChunkReader.SCHEMA_CHUNK_READER;
}
// Note: this Stringify check should come last
if (options.columnConversionMode().equals(ColumnConversionMode.Stringify)) {
return StringChunkReader.STRING_CHUNK_READER;
}
// TODO (core#936): support column conversion modes
throw new UnsupportedOperationException(
Expand All @@ -210,4 +185,112 @@ public ChunkReader getReader(StreamReaderOptions options, int factor,
throw new UnsupportedOperationException();
}
}

private enum ByteArrayChunkReader implements ChunkReader {
BYTEARRAY_READER;

@Override
public WritableChunk<Values> readChunk(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk<Values> outChunk, int outOffset,
int totalRows) throws IOException {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
ByteArrayChunkReader::readBytes,
outChunk,
outOffset,
totalRows);
}

private static byte[] readBytes(byte[] buf, int off, int len) {
return Arrays.copyOfRange(buf, off, off + len);
}
}

private enum BigIntegerChunkReader implements ChunkReader {
BIG_INTEGER_CHUNK_READER;

@Override
public WritableChunk<Values> readChunk(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk<Values> outChunk, int outOffset,
int totalRows) throws IOException {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
BigInteger::new,
outChunk,
outOffset,
totalRows);
}
}

private enum BigDecimalChunkReader implements ChunkReader {
BIG_DECIMAL_CHUNK_READER;

@Override
public WritableChunk<Values> readChunk(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk<Values> outChunk, int outOffset,
int totalRows) throws IOException {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
BigDecimalChunkReader::readBigDecimal,
outChunk,
outOffset,
totalRows);
}

private static BigDecimal readBigDecimal(byte[] buf, int offset, int length) {
// read the int scale value as little endian, arrow's endianness.
final byte b1 = buf[offset];
final byte b2 = buf[offset + 1];
final byte b3 = buf[offset + 2];
final byte b4 = buf[offset + 3];
final int scale = b4 << 24 | (b3 & 0xFF) << 16 | (b2 & 0xFF) << 8 | (b1 & 0xFF);
return new BigDecimal(new BigInteger(buf, offset + 4, length - 4), scale);
}
}

private enum StringChunkReader implements ChunkReader {
STRING_CHUNK_READER;

@Override
public WritableChunk<Values> readChunk(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk<Values> outChunk, int outOffset,
int totalRows) throws IOException {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
StringChunkReader::readString,
outChunk,
outOffset,
totalRows);
}

private static String readString(byte[] buf, int off, int len) {
return new String(buf, off, len, Charsets.UTF_8);
}
}

private enum SchemaChunkReader implements ChunkReader {
SCHEMA_CHUNK_READER;

@Override
public WritableChunk<Values> readChunk(Iterator<ChunkInputStreamGenerator.FieldNodeInfo> fieldNodeIter,
PrimitiveIterator.OfLong bufferInfoIter, DataInput is, WritableChunk<Values> outChunk, int outOffset,
int totalRows) throws IOException {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
ArrowIpcUtil::deserialize,
outChunk,
outOffset,
totalRows);
}
}
}

0 comments on commit 4e48079

Please sign in to comment.