November, 2018 update with performance investigate: part 1, part 2, part 3.
- Introduction
- Example project setup and dependencies
- Apache Arrow Java writer
- Apache Arrow Java reader
- Writing custom WritableByteChannel and SeekableReadChannel
- Example Apache Parquet to Arrow to convertor
In this blog post I am going to show to how to write and read Apache Arrow files in a stand-alone Java program.
Apache Arrow (https://arrow.apache.org/) is a popular in-memory columnar storage format. It is to memory what is parquet/ORC are to disk-oriented columnar storage formats. The goals of the project is to standardize in-memory columnar data presentations for all data processing engines (e.g., Spark, Drill, Impala, etc.). This standardization helps with reducing the communication and serialization overheads, increases shared code-base to manage data (e.g., parquet reading to arrow format), and promises to improve performance as well. For more details about Arrow please refer to the website.
Apache Arrow JavaDocs are available here.
All the code I am going to discuss here is available as a maven project at https://github.com/animeshtrivedi/ArrowExample
You can specify arrow maven dependency in your project by putting these in the pom file.
<dependencies>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>0.8.0</version>
</dependency>
</dependencies>
I highly recommend to use the 0.8
release of arrow. But if you are interested in how
it can be done with the 0.7
release then check the 0.7-example
branch in the github
project.
We first start with the pertinent writer-side objects that can be found in the
ArrowWriter.java
class in the project at
https://github.com/animeshtrivedi/ArrowExample/blob/master/src/main/java/com/github/animeshtrivedi/arrowexample/ArrowWrite.java
We start with how to define Arrow schema on the writer side. This is done by defining
a list of Field
objects and passing them to make a Schema
object. A Field
object
takes a string name, type, and additional children parameter, if the type is of a complex
type (like list, map, etc.). We will be covering simple primitive types in this blog post.
A field object of type integer can be defined as:
Field intField = new Field("int", FieldType.nullable(new ArrowType.Int(32, true)), null);
In this example, 32
says the bit-width. true
tells that it is a signed integer. And the
last null
points out that it has no complex children. In the ArrowWriter
class, there are
more example of how to define long, float, double, etc. Another important primitive type
to define is a byte[]
type (any arbitrary data type can be serialized to and de-serialized
from byte[]
type). byte[]
field is defined as :
Field binaryField = new Field("binary", FieldType.nullable(new ArrowType.Binary()), null);
Having defined multiple Fields now we have to stitch then together to make a schema. This can done as simply as
ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
childrenBuilder.add(intField);
childrenBuilder.add(binaryField);
...
new Schema(childrenBuilder.build(), null);
A Schema
object constructor only requires an iterable list.
Schema(Iterable<Field> fields, Map<String, String> metadata);
The next important class to understand is VectorSchemaRoot
. It is responsible for
managing reader and writer interfaces, memory allocation, and more. So, we first allocate
a VectorSchemaRoot
object as
VectorSchemaRoot root = VectorSchemaRoot.create(schema, new RootAllocator(Integer.MAX_VALUE));
Arrow file I/O interfaces are defined in ArrowFileReader
and ArrowFileWriter
classes. We
need a writer to write a file. Its constructor is defined as :
public ArrowFileWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out)
We already have VectorSchemaRoot
. DictionaryProvider
can be allocated by simply calling
new DictionaryProvider.MapDictionaryProvider()
. Dictionaries are important when passing a
reader's data to another writer, where data might be dictionary encoded. But here a new dictionary
will do. And now the WritableByteChannel
, which is a java abstraction.
Default file API in java already have an implementation for this which can be obtained simply
by calling getChannel
on an FileOutputStream
object. However, you are free to implement your
own as well. As we will show later in the blog one does have to write one if (s)he is reading
and writing data from HDFS. So, at this point we have a ArrowFileWriter
object.
The code corresponding to the setup discussion so far can be found in the setupWrite
function
in the ArrowWrite
class in the example project.
While writing data, we need to decide on the batch size. A batch size can be though of as the unit of reading/writing Arrow data. When reading, an ArrowReader always reads a full batch of Arrow data in one go. Hence, the batch size dictates the amount of buffering and memory needed. In our example we have chosen 100 entries as the batch size randomly.
All the writing logic is encapsulated between a call to the start
and end
functions on the
ArrowFileWriter
. And the batch count is set by calling VectorSchemaRoot.setRowCount(int)
and
batches are written by calling writeBatch
function. So, the data writing template looks something
like:
arrowFileWriter.start();
// writing logic here in a loop of batches
while(moreBatch){
root.setRowCount(batchSize);
// write your data
...
arrowFileWriter.writeBatch();
}
arrowFileWriter.end();
arrowFileWriter.close();
To write to a particular column/field we need to get a FieldVector
which encapsulates the
inner buffers where the data is stored according to their types. This can be done by calling
FieldVector vector = root.getVector("intField"); // takes the field name
based upon the type of the Field, this FieldVector can be of type which corresponds to the
underlying data type. Hence, one can explicitly cast them too. For example, an integer field
gives an IntVector
type and so on. Check the javadocs to find our various FieldVector
implementations.
Once we have an IntVector
you first need to allocate an capacity either by setting it
(if know beforehand) or using whatever the default is. This can be done as:
IntVector intVector = (IntVector) fieldVector;
intVector.setInitialCapacity(items);
intVector.allocateNew();
and then the values can be written of by calling
for(int index = 0; index < items; index++){
intVector.setSafe(index, isDefined, your_int_value);
}
index
is the index in the column vector. isDefined
says if the value is defined or not. If we have
a null value then we set to 0. And lastly, we pass our int value. setSafe
function will resize the
underlying buffer if it overflows. At last, we have to tell the intVector
how many values did we
write by calling :
fieldVector.setValueCount(items);
That is it. Other primitive field Vector implementations also follow the same format.
For a variable binary byte[]
type it is a bit more evolved as we need to tell VarBinaryVector
the length of the binary data. So the sequence for writing for a binary value is
byte[] arr = ...
varBinaryVector.setIndexDefined(index);
varBinaryVector.setValueLengthSafe(index, arr.length);
varBinaryVector.setSafe(index, arr);
otherwise if the arr is null, then the corresponding index can be marked null by:
varBinaryVector.setNull(index);
After writing all values, the batch is flushed by calling arrowFileWriter.writeBatch()
and the next
batch is written. Once everything is done, the ArrowFileWriter
is marked end
and close
as shown
above.
The full seuqence of the logic can be found in the writeData
function in the ArrowWrite
class in the example project.
We now move on to reader-side objects that can be found in the ArrowReader.java
class in the
project at
https://github.com/animeshtrivedi/ArrowExample/blob/master/src/main/java/com/github/animeshtrivedi/arrowexample/ArrowRead.java
When reading an Arrow file the reader needs to first find the schema. The sequence of the code
is kind of similar to what we have seen before. We first need to open the file and then allocate
an ArrowFileReader
object as
File arrowFile = new File("...");
FileInputStream fileInputStream = new FileInputStream(arrowFile);
SeekableReadChannel seekableReadChannel = new SeekableReadChannel(fileInputStream.getChannel());
ArrowFileReader arrowFileReader = new ArrowFileReader(seekableReadChannel,
new RootAllocator(Integer.MAX_VALUE));
The arrowFileReader
takes a SeekableReadChannel
interface defined by the Arrow project. It has a
default implementation for the FileChannel
that can be obtained from a FileInputStream
object.
Once the reader is ready then just call
VectorSchemaRoot root = arrowFileReader.getVectorSchemaRoot(); // get root
Schema schema = root.getSchema(); // get schema
to read the VectorSchemaRoot
object which contains the schema of the data.
As we mentioned before, data is written out in batches. So we first read batch metadata (as ArrowBlock
) by :
List<ArrowBlock> arrowBlocks = arrowFileReader.getRecordBlocks();
We then have to load every block one-by-one for reading the data. This can be done as
for (int i = 0; i < arrowBlocks.size(); i++) {
ArrowBlock rbBlock = arrowBlocks.get(i);
if (!arrowFileReader.loadRecordBatch(rbBlock)) { // load the batch
throw new IOException("Expected to read record batch");
}
// do something with the loaded batch
[...]
}
Once a block is loaded, it's metadata and data can be accessed. For example, the block
contains how many rows there are root.getRowCount()
. The loaded of a block sets the
row count in the root.
Similar to the writing process, the reader needs to get hold of the FieldVector
s that
can be obtained as (after loading a block, of course):
List<FieldVector> fieldVector = root.getFieldVectors();
Then based upon their MinorType, we can switch to type specific readers:
Types.MinorType mt = fieldVector.get(j).getMinorType();
switch(mt){
case INT : ...; break;
case BIGING : ...; break;
case VARBINARY : ...; break;
}
What are MinorTypes? According to a discussion on the mailing list : Minor types are a representation of the different vector types. I believe they are being de-emphasized in favor of FieldTypes, as minor types don't contain enough information to represent all vectors.
. A switch
can be made to the ArrowType.ArrowTypeID
that is saved in the FieldType as
ArrowType.ArrowTypeID aID = fieldVector.get(j).getField().getFieldType().getType().getTypeID();
but I have not done so yet.
Coming back to the point how to read the data, once the reader has the FieldVector
and its type, it
can be cast explicitly into the right data type, just like on the writer side. For example, for the integer
type:
IntVector intVector = ((IntVector) fx);
for(int j = 0; j < intVector.getValueCount(); j++){
if(!intVector.isNull(j)){
int value = intVector.get(j)
[...]
} else {
// the value is null
[...]
}
} // end of for loop
Similarly the byte[]
can be read as
VarBinaryVector varBinaryVector =((VarBinaryVector) fx);
for(int j = 0; j < varBinaryVector.getValueCount(); j++){
if(!varBinaryVector.isNull(j)){
byte[] value = varBinaryVector.get(j);
[...]
} else {
// the value is null
[...]
}
}
The reader repeatedly loads a new batch and reads values. Once done, we can close the reader
arrowFileReader.close();
WritableByteChannel
and SeekableReadChannel
implementations are expected in the ArrowFileWriter
and ArrowFileReader
constructors. At times for many popular I/O interfaces (e.g., HDFS)
WritableByteChannel
and SeekableReadChannel
implementations may not be readily available. So,
is is absolutely possible to write your own implementations as well. Without going into further
details here are some code snippet that I have written for HDFS
-
HDFSWritableByteChannel.java : https://gist.github.com/animeshtrivedi/436d95bd8a43a6f47e580594cb8138c3
-
HdfsSeekableByteChannel.java : https://gist.github.com/animeshtrivedi/ac1ecab482c10f902560eafce1043421
Here is an example to convert exisiting Parquet data to Arrow. https://gist.github.com/animeshtrivedi/76de64f9dab1453958e1d4f8eca1605f