Skip to content

Commit

Permalink
Add comments and fix CI check failure
Browse files Browse the repository at this point in the history
  • Loading branch information
jmao-denver committed Dec 17, 2022
1 parent fba917a commit 04a1adf
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 15 deletions.
1 change: 1 addition & 0 deletions docker/runtime-base/src/main/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ FROM deephaven/server-base:local-build

COPY wheels/ /wheels
RUN set -eux; \
python -m pip install pyarrow==10.0.1; \
python -m pip install -q --no-index --no-cache-dir /wheels/*.whl; \
rm -r /wheels
2 changes: 1 addition & 1 deletion py/server/deephaven/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def to_table(pa_table: pa.Table, cols: List[str] = None) -> Table:

try:
pa_buffer = dh_schema.serialize()
j_barrage_table_builder.addSchema(pa_buffer.to_pybytes())
j_barrage_table_builder.setSchema(pa_buffer.to_pybytes())

record_batches = pa_table.to_batches()
for rb in record_batches:
Expand Down
19 changes: 19 additions & 0 deletions py/server/tests/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,25 @@ def test_round_trip_types(self):
dh_table_rt = dharrow.to_table(pa_table)
self.assert_table_equals(dh_table, dh_table_rt)

def test_round_trip_empty(self):
cols = [
# bool_col(name="Boolean", data=[True, False]),
byte_col(name="Byte", data=()),
# char_col(name="Char", data='-1'),
# short_col(name="Short", data=[1, -1]),
# int_col(name="Int", data=[1, -1]),
# long_col(name="Long", data=[1, -1]),
# long_col(name="NPLong", data=np.array([1, -1], dtype=np.int8)),
# float_col(name="Float", data=[1.01, -1.01]),
# double_col(name="Double", data=[1.01, -1.01]),
# string_col(name="String", data=["foo", "bar"]),
# datetime_col(name="Datetime", data=[dtypes.DateTime(1), dtypes.DateTime(-1)]),
]
dh_table = new_table(cols=cols)
pa_table = dharrow.to_arrow(dh_table)
dh_table_rt = dharrow.to_table(pa_table)
self.assert_table_equals(dh_table, dh_table_rt)


if __name__ == '__main__':
unittest.main()
36 changes: 23 additions & 13 deletions server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,12 @@ private static void createAndSendSnapshot(BaseTable table, BitSet columns, RowSe
}
}

/**
* This class allows the incremental making of a BarrageTable from Arrow IPC messages, starting with an Arrow Schema
* message followed by zero or more RecordBatches
*/
@SuppressWarnings("unused")
public static class ArrowToTableConverter {

long totalRowsRead = 0;
BarrageTable resultTable;
ChunkType[] columnChunkTypes;
Expand All @@ -278,7 +281,7 @@ public static class ArrowToTableConverter {
Class<?>[] componentTypes;
BarrageSubscriptionOptions options = DEFAULT_SER_OPTIONS;

private static MessageInfo parseArrowIpcMessage(final byte[] ipcMessage, boolean isSchema) throws IOException {
private static MessageInfo parseArrowIpcMessage(final byte[] ipcMessage) throws IOException {
final MessageInfo mi = new MessageInfo();

final ByteBuffer bb = ByteBuffer.wrap(ipcMessage);
Expand All @@ -287,7 +290,11 @@ private static MessageInfo parseArrowIpcMessage(final byte[] ipcMessage, boolean
final int metadata_size = bb.getInt();
mi.header = Message.getRootAsMessage(bb);

if (!isSchema) {
if (mi.header == null) {
throw new IllegalArgumentException("The input is not a valid Arrow IPC message");
}

if (mi.header.headerType() == MessageHeader.RecordBatch) {
bb.position(metadata_size + 8);
final ByteBuffer bodyBB = bb.slice();
final ByteBufferInputStream bbis = new ByteBufferInputStream(bodyBB);
Expand All @@ -298,10 +305,10 @@ private static MessageInfo parseArrowIpcMessage(final byte[] ipcMessage, boolean
return mi;
}

public void addSchema(final byte[] ipcMessage) {
public void setSchema(final byte[] ipcMessage) {
final MessageInfo mi = getMessageInfo(ipcMessage);
if (mi.header == null || mi.header.headerType() != MessageHeader.Schema) {
throw new IllegalArgumentException("The input is not a valid Arrow schema IPC message");
if (mi.header.headerType() != MessageHeader.Schema) {
throw new IllegalArgumentException("The input is not a valid Arrow Schema IPC message");
}
parseSchema((Schema) mi.header.header(new Schema()));
}
Expand All @@ -312,8 +319,7 @@ public void addRecordBatch(final byte[] ipcMessage) {
}

final MessageInfo mi = getMessageInfo(ipcMessage);

if (mi.header == null || mi.header.headerType() != MessageHeader.RecordBatch) {
if (mi.header.headerType() != MessageHeader.RecordBatch) {
throw new IllegalArgumentException("The input is not a valid Arrow RecordBatch IPC message");
}

Expand All @@ -335,7 +341,7 @@ public BarrageTable getResultTable() {
private MessageInfo getMessageInfo(byte[] ipcMessage) {
final MessageInfo mi;
try {
mi = parseArrowIpcMessage(ipcMessage, false);
mi = parseArrowIpcMessage(ipcMessage);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -1012,6 +1018,10 @@ public synchronized void close() {
}


/**
* This class produces Arrow Ipc Messages for a Deephaven table, including the schema and all columnar data. The data
* is split into chunks and returned as multiple Arrow RecordBatch messages.
*/
@SuppressWarnings("unused")
public static class TableToArrowConverter {
private final BaseTable table;
Expand All @@ -1022,7 +1032,7 @@ public TableToArrowConverter(BaseTable table) {
this.table = table;
}

private void buildBatches() {
private void buildRecordBatches() {
if (arrowBuilderObserver == null) {
final BarragePerformanceLog.SnapshotMetricsHelper metrics =
new BarragePerformanceLog.SnapshotMetricsHelper();
Expand All @@ -1040,19 +1050,19 @@ public byte[] getSchema() {
}

public boolean hasMore() {
buildBatches();
buildRecordBatches();
return !arrowBuilderObserver.batchMessages.isEmpty();
}

public byte[] next() {
buildBatches();
buildRecordBatches();
if (arrowBuilderObserver.batchMessages.isEmpty()) {
throw new IllegalStateException("There is no more RecordBatch for the table");
}
return arrowBuilderObserver.batchMessages.pop();
}

public static class ArrowBuilderObserver implements StreamObserver<BarrageStreamGenerator.View> {
private static class ArrowBuilderObserver implements StreamObserver<BarrageStreamGenerator.View> {
volatile boolean completed = false;

final Deque<byte[]> batchMessages = new ArrayDeque<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@

public class BarrageStreamGenerator implements
BarrageMessageProducer.StreamGenerator<BarrageStreamGenerator.View> {
/**
* This sub-generator is used by the TableToArrowConverter to write data in plain Arrow Ipc format without wrapping
* decorating them with Arrow Flight metadata first
*/
public static class ArrowStreamGeneratr extends BarrageStreamGenerator {

public ArrowStreamGeneratr(BarrageMessage message, BarragePerformanceLog.WriteMetricsConsumer metricsConsumer) {
super(message, metricsConsumer);
}
Expand Down

0 comments on commit 04a1adf

Please sign in to comment.