Skip to content

Latest commit

 

History

History
316 lines (278 loc) · 14.7 KB

2017-12-26-arrow.md

File metadata and controls

316 lines (278 loc) · 14.7 KB

Reading and writing Apache Arrow file format in Java

November, 2018 update with performance investigate: part 1, part 2, part 3.

Index

  1. Introduction
  2. Example project setup and dependencies
  3. Apache Arrow Java writer
  4. Apache Arrow Java reader
  5. Writing custom WritableByteChannel and SeekableReadChannel
  6. Example Apache Parquet to Arrow to convertor

Introduction

In this blog post I am going to show to how to write and read Apache Arrow files in a stand-alone Java program.

Apache Arrow (https://arrow.apache.org/) is a popular in-memory columnar storage format. It is to memory what is parquet/ORC are to disk-oriented columnar storage formats. The goals of the project is to standardize in-memory columnar data presentations for all data processing engines (e.g., Spark, Drill, Impala, etc.). This standardization helps with reducing the communication and serialization overheads, increases shared code-base to manage data (e.g., parquet reading to arrow format), and promises to improve performance as well. For more details about Arrow please refer to the website.

Apache Arrow JavaDocs are available here.

Example project setup and dependencies

All the code I am going to discuss here is available as a maven project at https://github.com/animeshtrivedi/ArrowExample

You can specify arrow maven dependency in your project by putting these in the pom file.

<dependencies>
 <dependency>
  <groupId>org.apache.arrow</groupId>
  <artifactId>arrow-memory</artifactId>
  <version>0.8.0</version>
 </dependency>

 <dependency>
  <groupId>org.apache.arrow</groupId>
  <artifactId>arrow-vector</artifactId>
  <version>0.8.0</version>
 </dependency>
</dependencies>

I highly recommend to use the 0.8 release of arrow. But if you are interested in how it can be done with the 0.7 release then check the 0.7-example branch in the github project.

Apache Arrow Java writer

We first start with the pertinent writer-side objects that can be found in the ArrowWriter.java class in the project at https://github.com/animeshtrivedi/ArrowExample/blob/master/src/main/java/com/github/animeshtrivedi/arrowexample/ArrowWrite.java

How to make schema

We start with how to define Arrow schema on the writer side. This is done by defining a list of Field objects and passing them to make a Schema object. A Field object takes a string name, type, and additional children parameter, if the type is of a complex type (like list, map, etc.). We will be covering simple primitive types in this blog post. A field object of type integer can be defined as:

Field intField = new Field("int", FieldType.nullable(new ArrowType.Int(32, true)), null);

In this example, 32 says the bit-width. true tells that it is a signed integer. And the last null points out that it has no complex children. In the ArrowWriter class, there are more example of how to define long, float, double, etc. Another important primitive type to define is a byte[] type (any arbitrary data type can be serialized to and de-serialized from byte[] type). byte[] field is defined as :

Field binaryField = new Field("binary", FieldType.nullable(new ArrowType.Binary()), null);

Having defined multiple Fields now we have to stitch then together to make a schema. This can done as simply as

ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
childrenBuilder.add(intField);
childrenBuilder.add(binaryField);
...
new Schema(childrenBuilder.build(), null);

A Schema object constructor only requires an iterable list.

Schema(Iterable<Field> fields, Map<String, String> metadata);

How to write data in Arrow

Setup

The next important class to understand is VectorSchemaRoot. It is responsible for managing reader and writer interfaces, memory allocation, and more. So, we first allocate a VectorSchemaRoot object as

VectorSchemaRoot root = VectorSchemaRoot.create(schema, new RootAllocator(Integer.MAX_VALUE));

Arrow file I/O interfaces are defined in ArrowFileReader and ArrowFileWriter classes. We need a writer to write a file. Its constructor is defined as :

public ArrowFileWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out)

We already have VectorSchemaRoot. DictionaryProvider can be allocated by simply calling new DictionaryProvider.MapDictionaryProvider(). Dictionaries are important when passing a reader's data to another writer, where data might be dictionary encoded. But here a new dictionary will do. And now the WritableByteChannel, which is a java abstraction. Default file API in java already have an implementation for this which can be obtained simply by calling getChannel on an FileOutputStream object. However, you are free to implement your own as well. As we will show later in the blog one does have to write one if (s)he is reading and writing data from HDFS. So, at this point we have a ArrowFileWriter object.

The code corresponding to the setup discussion so far can be found in the setupWrite function in the ArrowWrite class in the example project.

Writing data

While writing data, we need to decide on the batch size. A batch size can be though of as the unit of reading/writing Arrow data. When reading, an ArrowReader always reads a full batch of Arrow data in one go. Hence, the batch size dictates the amount of buffering and memory needed. In our example we have chosen 100 entries as the batch size randomly.

All the writing logic is encapsulated between a call to the start and end functions on the ArrowFileWriter. And the batch count is set by calling VectorSchemaRoot.setRowCount(int) and batches are written by calling writeBatch function. So, the data writing template looks something like:

arrowFileWriter.start();
// writing logic here in a loop of batches 
while(moreBatch){
   root.setRowCount(batchSize);
   // write your data 
   ...
   arrowFileWriter.writeBatch();
}
arrowFileWriter.end();
arrowFileWriter.close();

To write to a particular column/field we need to get a FieldVector which encapsulates the inner buffers where the data is stored according to their types. This can be done by calling

FieldVector vector = root.getVector("intField"); // takes the field name 

based upon the type of the Field, this FieldVector can be of type which corresponds to the underlying data type. Hence, one can explicitly cast them too. For example, an integer field gives an IntVector type and so on. Check the javadocs to find our various FieldVector implementations.

Once we have an IntVector you first need to allocate an capacity either by setting it (if know beforehand) or using whatever the default is. This can be done as:

IntVector intVector = (IntVector) fieldVector;
intVector.setInitialCapacity(items);
intVector.allocateNew();

and then the values can be written of by calling

for(int index = 0; index < items; index++){
  intVector.setSafe(index, isDefined, your_int_value);
}

index is the index in the column vector. isDefined says if the value is defined or not. If we have a null value then we set to 0. And lastly, we pass our int value. setSafe function will resize the underlying buffer if it overflows. At last, we have to tell the intVector how many values did we write by calling :

fieldVector.setValueCount(items);

That is it. Other primitive field Vector implementations also follow the same format.

For a variable binary byte[] type it is a bit more evolved as we need to tell VarBinaryVector the length of the binary data. So the sequence for writing for a binary value is

byte[] arr = ... 
varBinaryVector.setIndexDefined(index);
varBinaryVector.setValueLengthSafe(index, arr.length);
varBinaryVector.setSafe(index, arr);

otherwise if the arr is null, then the corresponding index can be marked null by:

varBinaryVector.setNull(index);

After writing all values, the batch is flushed by calling arrowFileWriter.writeBatch() and the next batch is written. Once everything is done, the ArrowFileWriter is marked end and close as shown above.

The full seuqence of the logic can be found in the writeData function in the ArrowWrite class in the example project.

Apache Arrow Java reader

We now move on to reader-side objects that can be found in the ArrowReader.java class in the project at https://github.com/animeshtrivedi/ArrowExample/blob/master/src/main/java/com/github/animeshtrivedi/arrowexample/ArrowRead.java

How to find schema

When reading an Arrow file the reader needs to first find the schema. The sequence of the code is kind of similar to what we have seen before. We first need to open the file and then allocate an ArrowFileReader object as

File arrowFile = new File("...");
FileInputStream fileInputStream = new FileInputStream(arrowFile);
SeekableReadChannel seekableReadChannel = new SeekableReadChannel(fileInputStream.getChannel());
ArrowFileReader arrowFileReader = new ArrowFileReader(seekableReadChannel, 
new RootAllocator(Integer.MAX_VALUE));

The arrowFileReader takes a SeekableReadChannel interface defined by the Arrow project. It has a default implementation for the FileChannel that can be obtained from a FileInputStream object. Once the reader is ready then just call

VectorSchemaRoot root  = arrowFileReader.getVectorSchemaRoot(); // get root 
Schema schema = root.getSchema(); // get schema 

to read the VectorSchemaRoot object which contains the schema of the data.

How to read data in Arrow

As we mentioned before, data is written out in batches. So we first read batch metadata (as ArrowBlock) by :

List<ArrowBlock> arrowBlocks = arrowFileReader.getRecordBlocks();

We then have to load every block one-by-one for reading the data. This can be done as

for (int i = 0; i < arrowBlocks.size(); i++) {
  ArrowBlock rbBlock = arrowBlocks.get(i);
  if (!arrowFileReader.loadRecordBatch(rbBlock)) { // load the batch 
    throw new IOException("Expected to read record batch");
  }
  // do something with the loaded batch 
  [...]  
}

Once a block is loaded, it's metadata and data can be accessed. For example, the block contains how many rows there are root.getRowCount(). The loaded of a block sets the row count in the root.

Similar to the writing process, the reader needs to get hold of the FieldVectors that can be obtained as (after loading a block, of course):

List<FieldVector> fieldVector = root.getFieldVectors();

Then based upon their MinorType, we can switch to type specific readers:

Types.MinorType mt = fieldVector.get(j).getMinorType();
switch(mt){
   case INT       : ...; break;
   case BIGING    : ...; break;
   case VARBINARY : ...; break;
   }

What are MinorTypes? According to a discussion on the mailing list : Minor types are a representation of the different vector types. I believe they are being de-emphasized in favor of FieldTypes, as minor types don't contain enough information to represent all vectors.. A switch can be made to the ArrowType.ArrowTypeID that is saved in the FieldType as

ArrowType.ArrowTypeID aID = fieldVector.get(j).getField().getFieldType().getType().getTypeID();

but I have not done so yet.

Coming back to the point how to read the data, once the reader has the FieldVector and its type, it can be cast explicitly into the right data type, just like on the writer side. For example, for the integer type:

IntVector intVector = ((IntVector) fx);
for(int j = 0; j < intVector.getValueCount(); j++){
  if(!intVector.isNull(j)){     
    int value = intVector.get(j)
    [...]
  } else {
    // the value is null 
    [...]    
  }
} // end of for loop 

Similarly the byte[] can be read as

VarBinaryVector varBinaryVector =((VarBinaryVector) fx);
for(int j = 0; j < varBinaryVector.getValueCount(); j++){
  if(!varBinaryVector.isNull(j)){
    byte[] value = varBinaryVector.get(j);
    [...]
  } else {
    // the value is null 
    [...]
  }
}

The reader repeatedly loads a new batch and reads values. Once done, we can close the reader arrowFileReader.close();

Writing custom WritableByteChannel and SeekableReadChannel

WritableByteChannel and SeekableReadChannel implementations are expected in the ArrowFileWriter and ArrowFileReader constructors. At times for many popular I/O interfaces (e.g., HDFS) WritableByteChannel and SeekableReadChannel implementations may not be readily available. So, is is absolutely possible to write your own implementations as well. Without going into further details here are some code snippet that I have written for HDFS

Example Apache Parquet to Arrow convertor

Here is an example to convert exisiting Parquet data to Arrow. https://gist.github.com/animeshtrivedi/76de64f9dab1453958e1d4f8eca1605f