Skip to content

Commit

Permalink
fixing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz committed Mar 9, 2017
1 parent 2f69be1 commit 363308e
Show file tree
Hide file tree
Showing 25 changed files with 479 additions and 440 deletions.
26 changes: 11 additions & 15 deletions java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,21 @@
*/
package org.apache.arrow.tools;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

import com.google.common.base.Preconditions;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.schema.ArrowDictionaryBatch;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.stream.ArrowStreamReader;
import org.apache.arrow.vector.stream.ArrowStreamWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;

public class EchoServer {
private static final Logger LOGGER = LoggerFactory.getLogger(EchoServer.class);

Expand All @@ -57,22 +54,21 @@ public ClientConnection(Socket socket) {

public void run() throws IOException {
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
List<ArrowRecordBatch> batches = new ArrayList<>();
List<ArrowDictionaryBatch> dictionaries = new ArrayList<>();
try (
InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream();
ArrowStreamReader reader = new ArrowStreamReader(in, allocator);
ArrowStreamWriter writer = new ArrowStreamWriter(reader.getSchema().getFields(), reader.getVectors(), out)) {
ArrowStreamWriter writer = new ArrowStreamWriter(reader.getVectorSchemaRoot(), reader, out)) {
// Read the entire input stream and write it back
writer.start();
int echoed = 0;
while (true) {
int loaded = reader.loadNextBatch();
reader.loadNextBatch();
int loaded = reader.getVectorSchemaRoot().getRowCount();
if (loaded == 0) {
break;
} else {
writer.writeBatch(loaded);
writer.writeBatch();
echoed += loaded;
}
}
Expand Down
25 changes: 13 additions & 12 deletions java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@
*/
package org.apache.arrow.tools;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.file.ArrowFileReader;
import org.apache.arrow.vector.file.ArrowFileWriter;
import org.apache.arrow.vector.file.ArrowFooter;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
Expand All @@ -32,12 +38,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;

public class FileRoundtrip {
private static final Logger LOGGER = LoggerFactory.getLogger(FileRoundtrip.class);

Expand Down Expand Up @@ -83,20 +83,21 @@ int run(String[] args) {
try (FileInputStream fileInputStream = new FileInputStream(inFile);
ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) {

ArrowFooter footer = arrowReader.readFooter();
Schema schema = footer.getSchema();
VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
Schema schema = root.getSchema();
LOGGER.debug("Input file size: " + inFile.length());
LOGGER.debug("Found schema: " + schema);

try (FileOutputStream fileOutputStream = new FileOutputStream(outFile);
ArrowFileWriter arrowWriter = new ArrowFileWriter(schema.getFields(), arrowReader.getVectors(), fileOutputStream.getChannel())) {
ArrowFileWriter arrowWriter = new ArrowFileWriter(root, arrowReader, fileOutputStream.getChannel())) {
arrowWriter.start();
while (true) {
int loaded = arrowReader.loadNextBatch();
arrowReader.loadNextBatch();
int loaded = root.getRowCount();
if (loaded == 0) {
break;
} else {
arrowWriter.writeBatch(loaded);
arrowWriter.writeBatch();
}
}
arrowWriter.end();
Expand Down
23 changes: 11 additions & 12 deletions java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,31 @@
*/
package org.apache.arrow.tools;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.file.ArrowBlock;
import org.apache.arrow.vector.file.ArrowFileReader;
import org.apache.arrow.vector.file.ArrowFooter;
import org.apache.arrow.vector.stream.ArrowStreamWriter;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.file.ArrowBlock;
import org.apache.arrow.vector.file.ArrowFileReader;
import org.apache.arrow.vector.stream.ArrowStreamWriter;

/**
* Converts an Arrow file to an Arrow stream. The file should be specified as the
* first argument and the output is written to standard out.
*/
public class FileToStream {

public static void convert(FileInputStream in, OutputStream out) throws IOException {
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
try (ArrowFileReader reader = new ArrowFileReader(in.getChannel(), allocator)) {
ArrowFooter footer = reader.readFooter();
try (ArrowStreamWriter writer = new ArrowStreamWriter(footer.getSchema().getFields(), reader.getVectors(), out)) {
for (ArrowBlock block: footer.getRecordBatches()) {
int loaded = reader.loadRecordBatch(block);
writer.writeBatch(loaded);
try (ArrowStreamWriter writer = new ArrowStreamWriter(reader.getVectorSchemaRoot(), reader, out)) {
for (ArrowBlock block: reader.getRecordBlocks()) {
reader.loadRecordBatch(block);
writer.writeBatch();
}
}
}
Expand Down
66 changes: 25 additions & 41 deletions java/tools/src/main/java/org/apache/arrow/tools/Integration.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,17 @@ enum Command {
ARROW_TO_JSON(true, false) {
@Override
public void execute(File arrowFile, File jsonFile) throws IOException {
try(
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
FileInputStream fileInputStream = new FileInputStream(arrowFile);
ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator);) {
ArrowFooter footer = arrowReader.readFooter();
Schema schema = footer.getSchema();
try(BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
FileInputStream fileInputStream = new FileInputStream(arrowFile);
ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) {
VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
Schema schema = root.getSchema();
LOGGER.debug("Input file size: " + arrowFile.length());
LOGGER.debug("Found schema: " + schema);
try (JsonFileWriter writer = new JsonFileWriter(jsonFile, JsonFileWriter.config().pretty(true));) {
VectorSchemaRoot root = new VectorSchemaRoot(footer.getSchema().getFields(), arrowReader.getVectors());
try (JsonFileWriter writer = new JsonFileWriter(jsonFile, JsonFileWriter.config().pretty(true))) {
writer.start(schema);
List<ArrowBlock> recordBatches = footer.getRecordBatches();
for (ArrowBlock rbBlock : recordBatches) {
int loaded = arrowReader.loadRecordBatch(rbBlock);
root.setRowCount(loaded);
for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) {
arrowReader.loadRecordBatch(rbBlock);
writer.write(root);
}
}
Expand All @@ -97,28 +93,20 @@ public void execute(File arrowFile, File jsonFile) throws IOException {
JSON_TO_ARROW(false, true) {
@Override
public void execute(File arrowFile, File jsonFile) throws IOException {
try (
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
JsonFileReader reader = new JsonFileReader(jsonFile, allocator);
) {
try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
JsonFileReader reader = new JsonFileReader(jsonFile, allocator)) {
Schema schema = reader.start();
LOGGER.debug("Input file size: " + jsonFile.length());
LOGGER.debug("Found schema: " + schema);
try (FileOutputStream fileOutputStream = new FileOutputStream(arrowFile);
ArrowFileWriter arrowWriter = new ArrowFileWriter(schema, fileOutputStream.getChannel(), allocator)) {
VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
// TODO json dictionaries
ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel())) {
arrowWriter.start();
// initialize vectors
VectorSchemaRoot root;
while ((root = reader.read()) != null) {
List<FieldVector> rootVectors = root.getFieldVectors();
for (int i = 0; i < rootVectors.size(); i++) {
FieldVector from = rootVectors.get(i);
FieldVector to = arrowWriter.getVectors().get(i);
TransferPair transfer = from.makeTransferPair(to);
transfer.transfer();
}
arrowWriter.writeBatch(root.getRowCount());
root.close();
reader.read(root);
while (root.getRowCount() != 0) {
arrowWriter.writeBatch();
reader.read(root);
}
arrowWriter.end();
}
Expand All @@ -129,29 +117,25 @@ public void execute(File arrowFile, File jsonFile) throws IOException {
VALIDATE(true, true) {
@Override
public void execute(File arrowFile, File jsonFile) throws IOException {
try (
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator);
FileInputStream fileInputStream = new FileInputStream(arrowFile);
ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator);
) {
try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator);
FileInputStream fileInputStream = new FileInputStream(arrowFile);
ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) {
Schema jsonSchema = jsonReader.start();
ArrowFooter footer = arrowReader.readFooter();
Schema arrowSchema = footer.getSchema();
VectorSchemaRoot arrowRoot = arrowReader.getVectorSchemaRoot();
Schema arrowSchema = arrowRoot.getSchema();
LOGGER.debug("Arrow Input file size: " + arrowFile.length());
LOGGER.debug("ARROW schema: " + arrowSchema);
LOGGER.debug("JSON Input file size: " + jsonFile.length());
LOGGER.debug("JSON schema: " + jsonSchema);
Validator.compareSchemas(jsonSchema, arrowSchema);

List<ArrowBlock> recordBatches = footer.getRecordBatches();
List<ArrowBlock> recordBatches = arrowReader.getRecordBlocks();
Iterator<ArrowBlock> iterator = recordBatches.iterator();
VectorSchemaRoot jsonRoot;
VectorSchemaRoot arrowRoot = new VectorSchemaRoot(arrowSchema.getFields(), arrowReader.getVectors());
while ((jsonRoot = jsonReader.read()) != null && iterator.hasNext()) {
ArrowBlock rbBlock = iterator.next();
int loaded = arrowReader.loadRecordBatch(rbBlock);
arrowRoot.setRowCount(loaded);
arrowReader.loadRecordBatch(rbBlock);
Validator.compareVectorSchemaRoot(arrowRoot, jsonRoot);
jsonRoot.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@
*/
package org.apache.arrow.tools;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.file.ArrowFileWriter;
import org.apache.arrow.vector.stream.ArrowStreamReader;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
Expand All @@ -30,21 +25,26 @@
import java.io.OutputStream;
import java.nio.channels.Channels;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.file.ArrowFileWriter;
import org.apache.arrow.vector.stream.ArrowStreamReader;

/**
* Converts an Arrow stream to an Arrow file.
*/
public class StreamToFile {
public static void convert(InputStream in, OutputStream out) throws IOException {
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) {
try (ArrowFileWriter writer = new ArrowFileWriter(reader.getSchema().getFields(), reader.getVectors(), Channels.newChannel(out))) {
try (ArrowFileWriter writer = new ArrowFileWriter(reader.getVectorSchemaRoot(), reader, Channels.newChannel(out))) {
writer.start();
while (true) {
int loaded = reader.loadNextBatch();
if (loaded == 0) {
reader.loadNextBatch();
if (reader.getVectorSchemaRoot().getRowCount() == 0) {
break;
}
writer.writeBatch(loaded);
writer.writeBatch();
}
writer.end();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
*/
package org.apache.arrow.tools;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
Expand All @@ -30,17 +36,9 @@
import org.apache.arrow.vector.file.ArrowBlock;
import org.apache.arrow.vector.file.ArrowFileReader;
import org.apache.arrow.vector.file.ArrowFileWriter;
import org.apache.arrow.vector.file.ArrowFooter;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.Assert;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;

public class ArrowFileTestFixtures {
static final int COUNT = 10;

Expand All @@ -63,12 +61,10 @@ static void validateOutput(File testOutFile, BufferAllocator allocator) throws E
try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
FileInputStream fileInputStream = new FileInputStream(testOutFile);
ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) {
ArrowFooter footer = arrowReader.readFooter();
Schema schema = footer.getSchema();
VectorSchemaRoot root = new VectorSchemaRoot(schema.getFields(), arrowReader.getVectors());
for (ArrowBlock rbBlock : footer.getRecordBatches()) {
int loaded = arrowReader.loadRecordBatch(rbBlock);
root.setRowCount(loaded);
VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
Schema schema = root.getSchema();
for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) {
arrowReader.loadRecordBatch(rbBlock);
validateContent(COUNT, root);
}
}
Expand All @@ -83,12 +79,10 @@ static void validateContent(int count, VectorSchemaRoot root) {
}

static void write(FieldVector parent, File file) throws FileNotFoundException, IOException {
Schema schema = new Schema(parent.getField().getChildren());
int valueCount = parent.getAccessor().getValueCount();
List<FieldVector> vectors = parent.getChildrenFromFields();
VectorSchemaRoot root = new VectorSchemaRoot(parent);
try (FileOutputStream fileOutputStream = new FileOutputStream(file);
ArrowFileWriter arrowWriter = new ArrowFileWriter(schema.getFields(), vectors, fileOutputStream.getChannel())) {
arrowWriter.writeBatch(valueCount);
ArrowFileWriter arrowWriter = new ArrowFileWriter(root, null, fileOutputStream.getChannel())) {
arrowWriter.writeBatch();
}
}

Expand Down
Loading

0 comments on commit 363308e

Please sign in to comment.