Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-542: Adding dictionary encoding to FileWriter #334

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d095f3f
ARROW-542: Adding dictionary encoding to file and stream writing
elahrvivaz Feb 16, 2017
e5c8e02
Merging dictionary unloader/loader with arrow writer/reader
elahrvivaz Feb 20, 2017
2f69be1
not passing around dictionary vectors with dictionary fields, adding …
elahrvivaz Feb 28, 2017
363308e
fixing tests
elahrvivaz Mar 9, 2017
568fda5
imports, formatting
elahrvivaz Mar 9, 2017
e567564
adding field size check in vectorschemaroot
elahrvivaz Mar 10, 2017
92a1e6f
fixing imports
elahrvivaz Mar 10, 2017
43c28af
adding test for nested dictionary encoded list
elahrvivaz Mar 10, 2017
a1508b9
removing dictionary vector method (instead use field.dictionary)
elahrvivaz Mar 10, 2017
682db6f
cleanup
elahrvivaz Mar 10, 2017
45caa02
reverting basewriter dictionary methods
elahrvivaz Mar 10, 2017
95c7b2a
cleanup
elahrvivaz Mar 10, 2017
db9a007
adding dictionary tests to echo server
elahrvivaz Mar 10, 2017
127937f
removing qualifier for magic
elahrvivaz Mar 10, 2017
8366288
making magic array private
elahrvivaz Mar 10, 2017
adec200
making arrow magic static, cleanup
elahrvivaz Mar 13, 2017
2ee7cfb
fixing FileToStream conversion
elahrvivaz Mar 15, 2017
a24854b
fixing StreamToFile conversion
elahrvivaz Mar 15, 2017
bde4eee
Handle 0-length message indicator for EOS in C++ StreamReader
wesm Mar 15, 2017
70639e0
restoring vector loader test
elahrvivaz Mar 15, 2017
1679934
cleaning up license
elahrvivaz Mar 15, 2017
00d78d3
fixing set bit validity value in NullableMapVector load
elahrvivaz Mar 16, 2017
5339730
fixing bitvector load of value count, adding struct integration test
elahrvivaz Mar 16, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ class StreamReader::StreamReaderImpl {

int32_t message_length = *reinterpret_cast<const int32_t*>(buffer->data());

if (message_length == 0) {
// Optional 0 EOS control message
*message = nullptr;
return Status::OK();
}

RETURN_NOT_OK(stream_->Read(message_length, &buffer));
if (buffer->size() != message_length) {
return Status::IOError("Unexpected end of stream trying to read message");
Expand Down
4 changes: 4 additions & 0 deletions integration/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,12 +680,16 @@ def stream_to_file(self, stream_path, file_path):
cmd = ['java', '-cp', self.ARROW_TOOLS_JAR,
'org.apache.arrow.tools.StreamToFile',
stream_path, file_path]
if self.debug:
print(' '.join(cmd))
run_cmd(cmd)

def file_to_stream(self, file_path, stream_path):
cmd = ['java', '-cp', self.ARROW_TOOLS_JAR,
'org.apache.arrow.tools.FileToStream',
file_path, stream_path]
if self.debug:
print(' '.join(cmd))
run_cmd(cmd)


Expand Down
48 changes: 21 additions & 27 deletions java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,19 @@
package org.apache.arrow.tools;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;

import com.google.common.base.Preconditions;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.stream.ArrowStreamReader;
import org.apache.arrow.vector.stream.ArrowStreamWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

public class EchoServer {
private static final Logger LOGGER = LoggerFactory.getLogger(EchoServer.class);

Expand All @@ -57,30 +53,28 @@ public ClientConnection(Socket socket) {

public void run() throws IOException {
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
List<ArrowRecordBatch> batches = new ArrayList<ArrowRecordBatch>();
try (
InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream();
ArrowStreamReader reader = new ArrowStreamReader(in, allocator);
) {
// Read the entire input stream.
reader.init();
while (true) {
ArrowRecordBatch batch = reader.nextRecordBatch();
if (batch == null) break;
batches.add(batch);
}
LOGGER.info(String.format("Received %d batches", batches.size()));

// Write it back
try (ArrowStreamWriter writer = new ArrowStreamWriter(out, reader.getSchema())) {
for (ArrowRecordBatch batch: batches) {
writer.writeRecordBatch(batch);
// Read the entire input stream and write it back
try (ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
// load the first batch before instantiating the writer so that we have any dictionaries
reader.loadNextBatch();
try (ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, socket.getOutputStream())) {
writer.start();
int echoed = 0;
while (true) {
int rowCount = reader.getVectorSchemaRoot().getRowCount();
if (rowCount == 0) {
break;
} else {
writer.writeBatch();
echoed += rowCount;
reader.loadNextBatch();
}
}
writer.end();
Preconditions.checkState(reader.bytesRead() == writer.bytesWritten());
LOGGER.info(String.format("Echoed %d records", echoed));
}
LOGGER.info("Done writing stream back.");
}
}

Expand Down
48 changes: 17 additions & 31 deletions java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,12 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.List;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.file.ArrowBlock;
import org.apache.arrow.vector.file.ArrowFooter;
import org.apache.arrow.vector.file.ArrowReader;
import org.apache.arrow.vector.file.ArrowWriter;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.file.ArrowFileReader;
import org.apache.arrow.vector.file.ArrowFileWriter;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
Expand Down Expand Up @@ -86,35 +80,27 @@ int run(String[] args) {
File inFile = validateFile("input", inFileName);
File outFile = validateFile("output", outFileName);
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); // TODO: close
try(
FileInputStream fileInputStream = new FileInputStream(inFile);
ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);) {
try (FileInputStream fileInputStream = new FileInputStream(inFile);
ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) {

ArrowFooter footer = arrowReader.readFooter();
Schema schema = footer.getSchema();
VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
Schema schema = root.getSchema();
LOGGER.debug("Input file size: " + inFile.length());
LOGGER.debug("Found schema: " + schema);

try (
FileOutputStream fileOutputStream = new FileOutputStream(outFile);
ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
) {

// initialize vectors

List<ArrowBlock> recordBatches = footer.getRecordBatches();
for (ArrowBlock rbBlock : recordBatches) {
try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock);
VectorSchemaRoot root = new VectorSchemaRoot(schema, allocator);) {

VectorLoader vectorLoader = new VectorLoader(root);
vectorLoader.load(inRecordBatch);

VectorUnloader vectorUnloader = new VectorUnloader(root);
ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
arrowWriter.writeRecordBatch(recordBatch);
try (FileOutputStream fileOutputStream = new FileOutputStream(outFile);
ArrowFileWriter arrowWriter = new ArrowFileWriter(root, arrowReader, fileOutputStream.getChannel())) {
arrowWriter.start();
while (true) {
arrowReader.loadNextBatch();
int loaded = root.getRowCount();
if (loaded == 0) {
break;
} else {
arrowWriter.writeBatch();
}
}
arrowWriter.end();
}
LOGGER.debug("Output file size: " + outFile.length());
}
Expand Down
27 changes: 13 additions & 14 deletions java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,29 @@

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.file.ArrowBlock;
import org.apache.arrow.vector.file.ArrowFooter;
import org.apache.arrow.vector.file.ArrowReader;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.file.ArrowFileReader;
import org.apache.arrow.vector.stream.ArrowStreamWriter;

/**
* Converts an Arrow file to an Arrow stream. The file should be specified as the
* first argument and the output is written to standard out.
*/
public class FileToStream {

public static void convert(FileInputStream in, OutputStream out) throws IOException {
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
try(
ArrowReader reader = new ArrowReader(in.getChannel(), allocator);) {
ArrowFooter footer = reader.readFooter();
try (
ArrowStreamWriter writer = new ArrowStreamWriter(out, footer.getSchema());
) {
for (ArrowBlock block: footer.getRecordBatches()) {
try (ArrowRecordBatch batch = reader.readRecordBatch(block)) {
writer.writeRecordBatch(batch);
}
try (ArrowFileReader reader = new ArrowFileReader(in.getChannel(), allocator)) {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
// load the first batch before instantiating the writer so that we have any dictionaries
reader.loadNextBatch();
try (ArrowStreamWriter writer = new ArrowStreamWriter(root, reader, out)) {
writer.start();
while (root.getRowCount() > 0) {
writer.writeBatch();
reader.loadNextBatch();
}
writer.end();
}
}
}
Expand Down
83 changes: 31 additions & 52 deletions java/tools/src/main/java/org/apache/arrow/tools/Integration.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,12 @@

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.file.ArrowBlock;
import org.apache.arrow.vector.file.ArrowFooter;
import org.apache.arrow.vector.file.ArrowReader;
import org.apache.arrow.vector.file.ArrowWriter;
import org.apache.arrow.vector.file.ArrowFileReader;
import org.apache.arrow.vector.file.ArrowFileWriter;
import org.apache.arrow.vector.file.json.JsonFileReader;
import org.apache.arrow.vector.file.json.JsonFileWriter;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.Validator;
import org.apache.commons.cli.CommandLine;
Expand Down Expand Up @@ -69,24 +65,18 @@ enum Command {
ARROW_TO_JSON(true, false) {
@Override
public void execute(File arrowFile, File jsonFile) throws IOException {
try(
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
try(BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
FileInputStream fileInputStream = new FileInputStream(arrowFile);
ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);) {
ArrowFooter footer = arrowReader.readFooter();
Schema schema = footer.getSchema();
ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) {
VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
Schema schema = root.getSchema();
LOGGER.debug("Input file size: " + arrowFile.length());
LOGGER.debug("Found schema: " + schema);
try (JsonFileWriter writer = new JsonFileWriter(jsonFile, JsonFileWriter.config().pretty(true));) {
try (JsonFileWriter writer = new JsonFileWriter(jsonFile, JsonFileWriter.config().pretty(true))) {
writer.start(schema);
List<ArrowBlock> recordBatches = footer.getRecordBatches();
for (ArrowBlock rbBlock : recordBatches) {
try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock);
VectorSchemaRoot root = new VectorSchemaRoot(schema, allocator);) {
VectorLoader vectorLoader = new VectorLoader(root);
vectorLoader.load(inRecordBatch);
writer.write(root);
}
for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) {
arrowReader.loadRecordBatch(rbBlock);
writer.write(root);
}
}
LOGGER.debug("Output file size: " + jsonFile.length());
Expand All @@ -96,27 +86,22 @@ public void execute(File arrowFile, File jsonFile) throws IOException {
JSON_TO_ARROW(false, true) {
@Override
public void execute(File arrowFile, File jsonFile) throws IOException {
try (
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
JsonFileReader reader = new JsonFileReader(jsonFile, allocator);
) {
try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
JsonFileReader reader = new JsonFileReader(jsonFile, allocator)) {
Schema schema = reader.start();
LOGGER.debug("Input file size: " + jsonFile.length());
LOGGER.debug("Found schema: " + schema);
try (
FileOutputStream fileOutputStream = new FileOutputStream(arrowFile);
ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
) {

// initialize vectors
VectorSchemaRoot root;
while ((root = reader.read()) != null) {
VectorUnloader vectorUnloader = new VectorUnloader(root);
try (ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();) {
arrowWriter.writeRecordBatch(recordBatch);
}
root.close();
try (FileOutputStream fileOutputStream = new FileOutputStream(arrowFile);
VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
// TODO json dictionaries
ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel())) {
arrowWriter.start();
reader.read(root);
while (root.getRowCount() != 0) {
arrowWriter.writeBatch();
reader.read(root);
}
arrowWriter.end();
}
LOGGER.debug("Output file size: " + arrowFile.length());
}
Expand All @@ -125,32 +110,26 @@ public void execute(File arrowFile, File jsonFile) throws IOException {
VALIDATE(true, true) {
@Override
public void execute(File arrowFile, File jsonFile) throws IOException {
try (
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator);
FileInputStream fileInputStream = new FileInputStream(arrowFile);
ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);
) {
try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator);
FileInputStream fileInputStream = new FileInputStream(arrowFile);
ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) {
Schema jsonSchema = jsonReader.start();
ArrowFooter footer = arrowReader.readFooter();
Schema arrowSchema = footer.getSchema();
VectorSchemaRoot arrowRoot = arrowReader.getVectorSchemaRoot();
Schema arrowSchema = arrowRoot.getSchema();
LOGGER.debug("Arrow Input file size: " + arrowFile.length());
LOGGER.debug("ARROW schema: " + arrowSchema);
LOGGER.debug("JSON Input file size: " + jsonFile.length());
LOGGER.debug("JSON schema: " + jsonSchema);
Validator.compareSchemas(jsonSchema, arrowSchema);

List<ArrowBlock> recordBatches = footer.getRecordBatches();
List<ArrowBlock> recordBatches = arrowReader.getRecordBlocks();
Iterator<ArrowBlock> iterator = recordBatches.iterator();
VectorSchemaRoot jsonRoot;
while ((jsonRoot = jsonReader.read()) != null && iterator.hasNext()) {
ArrowBlock rbBlock = iterator.next();
try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock);
VectorSchemaRoot arrowRoot = new VectorSchemaRoot(arrowSchema, allocator);) {
VectorLoader vectorLoader = new VectorLoader(arrowRoot);
vectorLoader.load(inRecordBatch);
Validator.compareVectorSchemaRoot(arrowRoot, jsonRoot);
}
arrowReader.loadRecordBatch(rbBlock);
Validator.compareVectorSchemaRoot(arrowRoot, jsonRoot);
jsonRoot.close();
}
boolean hasMoreJSON = jsonRoot != null;
Expand Down
19 changes: 11 additions & 8 deletions java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.file.ArrowWriter;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.file.ArrowFileWriter;
import org.apache.arrow.vector.stream.ArrowStreamReader;

/**
Expand All @@ -38,13 +38,16 @@ public class StreamToFile {
public static void convert(InputStream in, OutputStream out) throws IOException {
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) {
reader.init();
try (ArrowWriter writer = new ArrowWriter(Channels.newChannel(out), reader.getSchema());) {
while (true) {
ArrowRecordBatch batch = reader.nextRecordBatch();
if (batch == null) break;
writer.writeRecordBatch(batch);
VectorSchemaRoot root = reader.getVectorSchemaRoot();
// load the first batch before instantiating the writer so that we have any dictionaries
reader.loadNextBatch();
try (ArrowFileWriter writer = new ArrowFileWriter(root, reader, Channels.newChannel(out))) {
writer.start();
while (root.getRowCount() > 0) {
writer.writeBatch();
reader.loadNextBatch();
}
writer.end();
}
}
}
Expand Down
Loading