-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-542: Adding dictionary encoding to FileWriter #334
Conversation
Will review when I can (possibly tomorrow, or Monday). I will make an effort to do the C++ side of the implementation in the next 2 weeks to help move us toward integration tests. |
Watch out for |
thanks, I'll squash/rebase once I've gotten closer to finishing. |
d40f2c5
to
a654075
Compare
I'll give this some more careful review tomorrow -- I will start on the C++ implementation tomorrow or this weekend sometime |
@julienledem if you have time to review, that would be very helpful. thanks! |
thanks, I think it's pretty much done, at least for a first cut |
Awesome. Really excited to get this working |
I'm currently reviewing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for your contribution.
This is a big patch.
Please see my comments bellow.
for (ArrowRecordBatch batch: batches) { | ||
writer.writeRecordBatch(batch); | ||
batch.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why was this not needed before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it was just an oversight - ArrowRecordBatch is Closeable, but they weren't ever being closed here.
Byte type = reader.nextBatchType(); | ||
if (type == null) { | ||
break; | ||
} else if (type == MessageHeader.RecordBatch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about:
else { switch (...
Byte type = reader.nextBatchType(); | ||
if (type == null) { | ||
break; | ||
} else if (type == MessageHeader.DictionaryBatch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
switch?
Byte type = reader.nextBatchType(); | ||
assertEquals(new Byte(MessageHeader.RecordBatch), type); | ||
try (ArrowRecordBatch result = reader.nextRecordBatch();) { | ||
assertTrue(result != null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertNotNull
ArrowRecordBatch result = reader.nextRecordBatch(); | ||
assertTrue(result == null); | ||
Byte type = reader.nextBatchType(); | ||
assertTrue(type == null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertNull
@@ -49,12 +53,12 @@ public DictionaryVector(ValueVector indices, Dictionary dictionary) { | |||
* @param vector vector to encode | |||
* @return dictionary encoded vector | |||
*/ | |||
public static DictionaryVector encode(ValueVector vector) { | |||
public static DictionaryVector encode(FieldVector vector) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I missed in the previous review, but the dictionary should be provided as a param at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
dictionaryVector.allocateNewSafe(); | ||
for (Map.Entry<Integer, Integer> entry: transfers.entrySet()) { | ||
dictionaryTransfer.copyValueSafe(entry.getKey(), entry.getValue()); | ||
} | ||
dictionaryVector.getMutator().setValueCount(transfers.size()); | ||
Dictionary dictionary = new Dictionary(dictionaryVector, false); | ||
Dictionary dictionary = new Dictionary(dictionaryVector); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not create Dictionaries without giving them an ID.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry that I've added confusion on this point. At least in the C++ implementation I don't intend to assign dictionary ids until entering the IPC code paths
throw new IOException("Invalid file. No batch at offset: " + block.getOffset()); | ||
} | ||
return batch; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some common code with method bellow.
@@ -64,7 +67,7 @@ public void writeRecordBatch(ArrowRecordBatch recordBatch) throws IOException { | |||
private void checkStarted() throws IOException { | |||
if (!started) { | |||
started = true; | |||
start(); | |||
writeMagic(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like we don't write the schema anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the schema is written in the footer - as far as i can tell, there was no reason to write it out a second time, and removing it didn't break anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @nongli ran into this issue (schema being written twice)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's some ambiguity on whether the file serialization should be the stream bytes exactly, with a magic number header and additional metadata (schema, block index for random access, maybe stats in the future). In this case, the schema would be there twice.
The goal would be to make it as simple as possible (and a minor benefit or being more efficient going from file -> stream: just one seek and read).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I'll add it back in
@@ -36,6 +38,7 @@ | |||
private ReadChannel in; | |||
private final BufferAllocator allocator; | |||
private Schema schema; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
schema is read from the input stream now, so can't be final as it's not initially loaded
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense. Thanks
thanks, I will fix this up next week |
I'm working on the C++ patch, should have it up later today or tomorrow. Integration tests will require follow up patches to add dictionary support to the JSON readers and writers. |
As I believe @elahrvivaz noted elsewhere, this is pretty tricky to implement because the schema for the record batch embedded in the I don't think this experience suggests we should change anything about the metadata, but we may be able to offer some advice to future language implementations (cough JavaScript) to make the overall experience a little kinder. |
2bc383f
to
fa73be5
Compare
@julienledem @wesm to consolidate the schema logic per your suggestion, I've merged the VectorLoader/Unloader with the ArrowWriter/Reader. Using them is a lot more straightforward now, but this is a pretty massive change - let me know if it's too much or hides too much functionality. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is nice work -- I think we're pretty close except for the code formatting issues and the issue around dictionaries in nested schemas.
You should add 2 test cases to account for the 2 more complicated dictionary-encoded cases: a dictionary-encoded nested vector (e.g. a List<Int32>
, where the lists are dictionary encoded -- i.e. if we had [[0, 1], [0], [0, 1], [0]]
, you could encode this to [0, 1, 0, 1]
with dictionary [[0, 1], [0]]
) and a nested vector containing dictionary-encoded values (I wrote a test case with List<String: dictionary-encoded>
) -- this will force resolving the schema issue.
I think the refactoring to combine the VectorLoader/Unloader makes sense.
@@ -57,30 +57,28 @@ public ClientConnection(Socket socket) { | |||
|
|||
public void run() throws IOException { | |||
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); | |||
List<ArrowRecordBatch> batches = new ArrayList<ArrowRecordBatch>(); | |||
List<ArrowRecordBatch> batches = new ArrayList<>(); | |||
List<ArrowDictionaryBatch> dictionaries = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used here?
import org.apache.arrow.memory.RootAllocator; | ||
import org.apache.arrow.vector.file.ArrowWriter; | ||
import org.apache.arrow.vector.schema.ArrowRecordBatch; | ||
import org.apache.arrow.vector.stream.ArrowStreamReader; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems you might have an import ordering problem. Some other code formatting issues below
import java.io.FileNotFoundException; | ||
import java.io.FileOutputStream; | ||
import java.io.IOException; | ||
import java.util.List; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import ordering
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think I fixed the ordering, let me know if you see anything still wrong
|
||
private ValueVector indices; | ||
private Dictionary dictionary; | ||
private final FieldVector indices; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed
public Field getField() { return indices.getField(); } | ||
public Field getField() { | ||
Field field = indices.getField(); | ||
return new Field(field.getName(), field.isNullable(), field.getType(), dictionary.getEncoding(), field.getChildren()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The transference of the field metadata from the indices to the dictionary should probably happen elsewhere (per above)
|
||
// go through to add dictionary id to the schema fields and to unload the dictionary batches | ||
for (FieldVector vector: vectors) { | ||
if (vector instanceof DictionaryVector) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is vectors
the flattened tree (I don't think so but just checking)? Dictionaries can occur anywhere in a type tree, but the one constraint is that dictionaries cannot contain dictionary encoded data in their descendents.
As an example, consider the type List<String>
, where the strings are dictionary encoded, so you might have
[['foo', 'bar'], ['foo'], ['bar']]
but really it's
[[0, 1], [0], [1]]
, and the inner child vector is a DictionaryVector. I have a test case for this in the C++ patch: https://github.com/wesm/arrow/blob/ARROW-459/cpp/src/arrow/ipc/test-common.h#L377
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I confirm that vectors is the top level. A vector for a complex type will have children.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the current dictionary encoding will work with complex types (lists or structs). I asked about that before and you guys said to hold off. I think that will require making dictionary a minor type so that dictionary encoded fields can be handled by the writers and readers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I need to examine the minor type stuff more closely so I understand this. We don't have the concept of a minor type in the C++ library, because the data type class is the logical type. Seems like things are inverted here (the type classes are physical types, and the minor type is the logical type), and that is making things more difficult. I'm sorry if I've created more work with incorrect cross-communication
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confirming that it is fine to hold off on dictionary for complex types. However a primitive type can be dictionary encoded and be a child of a complex type.
See my comment bellow:
#334 (comment)
This should clarify how to create the DIctionaryVector in the right place.
Field replacement = new Field(field.getName(), field.isNullable(), dictionaryType, dictionary.getEncoding(), field.getChildren()); | ||
|
||
updatedFields.remove(fieldIndex); | ||
updatedFields.add(fieldIndex, replacement); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way I dealt with the tree traversal issue (with dictionaries occurring in children perhaps) was in 3 stages:
-
first pass computes a mapping of dictionary id to Field, so I know the schema of each dictionary https://github.com/wesm/arrow/blob/ARROW-459/cpp/src/arrow/ipc/metadata.cc#L170
-
read and load all dictionary vectors
-
now construct schema tree with
map<long, Dictionary>
in hand
so I believe this is the reverse order of what is in this patch now. I'm not sure which is easier
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed.
When Reading a File:
- you first get the schema. From the schema you can produce Map<DictionaryID, Field> which will capture the schema of each DictionaryBatch.
- Then you read each DictionaryBatch. You know their schema from the Mapping above that let you load each dictionary in a vector (possibly a complex one with children). You get a list of Dictionaries. Each dictionary is defined as follows: Dictionary { DictionaryID id; Field schema; ValueVector values}
- Then you read each RecordBatch. Fields that are Dictionary encoded will have a DictionaryVector instead of the ValueVector of the original type. Possibly you need to create a new MinorType for that (basically one MinorType per Vector class)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@julienledem I originally went down the path of adding a new minor type for dictionaries, but @wesm asked me to roll those changes back and keep dictionary vector as a synthetic type... making it a minor type would modify all the generated classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@julienledem having a "synthetic type" (the composes the dictionary vector and the indices type) made things significantly easier, at least on the C++ side. See, for example: https://github.com/apache/arrow/blob/master/cpp/src/arrow/ipc/test-common.h#L360
I'm not sure what are the benefits of adding a MinorType (and touching all the generated code)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me clarifies this.
We should not add a type to the Messages.fbs for dictionary. I think that was part of the original PR and was rolled back.
The MinorType class is more of a Vector Class ID. It should be renamed IMO but this is a separate discussion. In my comment I said "possibly" meaning I was not quite sure, so let's take the MinorType comment out for now.
Here is what I was thinking about:
We need to change the implementation of FieldVector.initializeChildrenFromFields() [1]. Look in particular at the implementation in MapVector [2] that uses AbstractMapVector.add [3] which uses MinorType.getNewVector [4] to create a new vector.
initializeChildrenFromFields is responsible instantiating the right Vector implementation based on each Field in the Schema. So really [2]/[3] should be changed to create a DictionaryVector instead of the type that would be used if not dictionary encoded.
That would replace some of the logic in ArrowReader.initialize() in this PR. This may not require a new MinorType, you'll see when you're in there.
[1]
void initializeChildrenFromFields(List<Field> children); |
[2]
public void initializeChildrenFromFields(List<Field> children) { |
[3]
arrow/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractMapVector.java
Line 170 in d28f1c1
FieldVector vector = minorType.getNewVector(name, allocator, callBack, precisionScale); |
[4]
public abstract FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@julienledem I'll look into this more closely, but wouldn't the problem be that we don't have the actual dictionary in FieldVector.initializeChildrenFormFields? Currently a DictionaryVector requires a dictionary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another possibility would be to require the user to dictionary encode their own vectors - e.g. if you define a field as dictionary encoded, you have to set the appropriate index values yourself, instead of the dictionary values. Then the dictionaries themselves are only required when converting to/from message format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i feel like a lot of the friction comes from the fact that much of the code assumes Fields are interchangeable with vectors (in that you can create a vector instance from a field when needed), but that is not true for dictionary vectors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@elahrvivaz I'd suggest that we decouple the Dictionary from the DictionaryVector for now. For context, one of the goals of dictionary encoding is to work with dictionary ids directly (to do aggregations, joins, etc faster). We don't always need the Dictionary around when doing so.
The DictionaryVector can have a reference to the dictionary id instead of the dictionary itself. You could have a DictionaryProvider { Dictionary getDictionary(ID) } that returns the dictionary for the the id. That would remove the dependency on having a reference to the Dictionary to create the DictionaryVector and still allow decoding a vector by looking up the corresponding dictionary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that sounds good, thanks
@Override | ||
public int hashCode() { | ||
return Objects.hash(id, dictionary, ordered); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
formatting issues in this file beyond indentation
|
||
public boolean isOrdered() { | ||
return ordered; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
formatting issues
org.apache.arrow.flatbuf.DictionaryEncoding.startDictionaryEncoding(builder); | ||
org.apache.arrow.flatbuf.DictionaryEncoding.addId(builder, dictionary.getId()); | ||
org.apache.arrow.flatbuf.DictionaryEncoding.addIsOrdered(builder, dictionary.isOrdered()); | ||
dictionaryOffset = org.apache.arrow.flatbuf.DictionaryEncoding.endDictionaryEncoding(builder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not handling the null case for the indexType yet in C++, but we can address during integration testing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@elahrvivaz, thanks for working on this.
I think we should not be merging VectorLoader/Unloader with the ArrowWriter/Reader. The goal is to be able to use VectorLoader and Unloader in other contexts than File Reading Writing. They're general purpose tools to load/unload buffers into vectors. If you want to define a class that encompasses reading a file and loading it into vectors then this should be a separate class the uses both the ArrowReader and VectorLoader. (favor composition over inheritance).
Sorry to spoil the fun. I do appreciate you working on this and I'm trying to not make you do extra work. I realize my previous review may have been a little unclear. Feel free to ask clarification on comments. Hopefully this one is better. We can also do a hangout if there's a need to discuss something.
arrowWriter.start(); | ||
while (true) { | ||
int loaded = arrowReader.loadNextBatch(); | ||
if (loaded == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int loaded;
while ((loaded = arrowReader.loadNextBatch()) != 0) {
arrowWriter.writeBatch(loaded);
}
writer.write(root); | ||
} | ||
int loaded = arrowReader.loadRecordBatch(rbBlock); | ||
root.setRowCount(loaded); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems strange that we need to call setRowCount() separately from calling loadRecordBatch()
This makes the API less clear.
We should be able to load with one call. This seems to indicate that the loading happens at the wrong level.
(in this change the ArrowFileReader doesn't know the VectorSchemaRoot)
VectorSchemaRoot is supposed to bundle together everything that defines a record batch:
- schema
- vectors
- rowCount (which is the same for all vectors)
While the schema and the list of vectors is immutable, the content of the vectors and rowCount changes every time we load a new RecordBatch.
Possibly the problem with VectorSchemaRoot is that the logic in it's constructors should be moved outside of it (for example, creating vectors from the schema)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case I was just re-using the VectorSchemaRoot in the unit test for simplicity, because the test checks it, so I wrapped the fields that are loaded into it.
The normal use case would be:
- construct a reader on an input stream
- schema and fields that are loaded are available via getters in the reader - they will read from the input stream when retrieved if not already loaded
- call loadNextBatch or loadRecordBatch (for random access) - returns number of records loaded, and populates fields with those records
- (do something with loaded fields)
It seemed redundant to have the VectorSchemaRoot, when most methods were retrieving the schema from the footer, not the VectorSchemaRoot. If we keep it around, the VectorSchemaRoot would have to be constructed in the reader, since we don't have the schema until something is actually read.
FieldVector from = rootVectors.get(i); | ||
FieldVector to = arrowWriter.getVectors().get(i); | ||
TransferPair transfer = from.makeTransferPair(to); | ||
transfer.transfer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From an API point of view, I don't think the writer should need to have its own vectors where the buffers need to be transferred.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
/** | ||
* Loads buffers into vectors | ||
*/ | ||
public class VectorLoader implements AutoCloseable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intent was to be able to use this in other contexts than reading from a file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re-added
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(ArrowFileReader.class); | ||
|
||
public static final byte[] MAGIC = "ARROW1".getBytes(StandardCharsets.UTF_8); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is mutable it should not be public.
If it needs to be accessed from somewhere else, you can add helper methods in this class that perform the operations needed. (comparing to data somewhere?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -47,6 +47,17 @@ public VectorSchemaRoot(FieldVector parent) { | |||
} | |||
} | |||
|
|||
public VectorSchemaRoot(List<Field> fields, List<FieldVector> fieldVectors) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
possibly this is the only constructor we should have in VectorSchemaRoot (+ rowCount if vectors have data in them)
Then we would have separate (static?) methods to create a VectorSchemaRoot either from a Schema (when reading a file) or from existing vectors (when writing a file)
|
||
private ValueVector indices; | ||
private Dictionary dictionary; | ||
private final FieldVector indices; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
meaning a regular ValueVector
|
||
public abstract class ArrowReader<T extends ReadChannel> implements AutoCloseable { | ||
|
||
private final T in; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes 2 spaces. Sorry for being unclear.
dictionaries.put(dictionaryEncoding.getId(), dictionaryVector); | ||
} | ||
// create index vector | ||
ArrowType dictionaryType = new ArrowType.Int(32, true); // TODO check actual index type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine to start with just one size. just throw UnsupportedOperationException if someone tries to use another type.
Field replacement = new Field(field.getName(), field.isNullable(), dictionaryType, dictionary.getEncoding(), field.getChildren()); | ||
|
||
updatedFields.remove(fieldIndex); | ||
updatedFields.add(fieldIndex, replacement); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed.
When Reading a File:
- you first get the schema. From the schema you can produce Map<DictionaryID, Field> which will capture the schema of each DictionaryBatch.
- Then you read each DictionaryBatch. You know their schema from the Mapping above that let you load each dictionary in a vector (possibly a complex one with children). You get a list of Dictionaries. Each dictionary is defined as follows: Dictionary { DictionaryID id; Field schema; ValueVector values}
- Then you read each RecordBatch. Fields that are Dictionary encoded will have a DictionaryVector instead of the ValueVector of the original type. Possibly you need to create a new MinorType for that (basically one MinorType per Vector class)
* Schema is modified in VectorLoader/Unloader to conform to message format * Dictionary IDs will be assigned at that time if not predefined * Stream reader must check for message type being read (dictionary or regular batch) * VectorLoader now creates the VectorSchemaRoot, instead of it being passed in
Creating base class for stream/file writer Creating base class with visitors for arrow messages Indentation fixes Other cleanup
…dictionary encoding to fields, restoring vector loader/unloader
@julienledem @wesm I've gone through another round of refactoring. Main changes since last time:
There's still some cleanup to do, but I'd appreciate your thoughts on the current approach. Thanks, |
@wesm I added a test for a vector of |
@wesm I think I know what the issue is - ran into it with the echo endpoint - let me see if I can push a fix |
break; | ||
} else { | ||
writer.writeBatch(); | ||
reader.loadNextBatch(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's a blocker, but having these two classes "wired" together through a mutable row batch (the root
) seems like a bad code smell. It might be worth eventually changing the API to be
reader.loadNextBatch(root)
writer.writeBatch(root)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't disagree... It's complicated by the fact that both the reader and writer have to manipulate the schema to account for dictionaries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you had a array or list of in-memory record batches in Java, can you write them with this API? I was able to add dictionaries to the C++ stream reader/writers without modifying the stream / file API, and schemas are immutable. I think it would be worth working toward that eventually, maybe we should open some JIRAs to cover
- Making Java schemas immutable
- Looser coupling between the reader and writer classes here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The writeRecordBatch method is still there, but it's protected visibility now. Is there a use case for creating batches outside the VectorUnloader? I consolidated the logic so that the file-reading/writing API was a lot simpler, but the underlying logic is the same.
The problem with schemas is that the java api doesn't represent things the same as the message format. In java, the vector type is the actual type of the vector, whereas in the message format it's the logical type. So in java a dictionary encoded varchar vector is type Int, but in the message format it's type varchar.
I think reading and then writing to another source seems like kind of an artificial use case that we're using for tests. The general workflow for writing is:
- create a VectorSchemaRoot object with your vectors
- create a writer with the VectorSchemaRoot and an output stream
- modify your vectors, call writer.writeBatch, repeat
For reading it's: - create a reader from an input stream - this will construct the VectorSchemaRoot object based on the schema read
- call reader.loadBatch, do something with the vectors loaded into the VectorSchemaRoot, repeat
Since they both use VectorSchemaRoot for the loaded vectors, it is easy to couple them together for the read/write.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably a question for @julienledem -- at least in C++, a basic unit of currency is std::vector<std::shared_ptr<RecordBatch>>
, so to be unable to accumulate record batches in some kind of collection (like a std::deque
or std::vector
), particularly in a concurrent context, and write them out, would be very limiting.
With this API, I am not sure how you would do concurrent reads and writes of record batches. I don't think there's anything to be done right now, but I think we should open JIRAs at least pointing out the potential problems
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, how would you do concurrent reads/writes anyway? Wouldn't you have to read/write from/to the underlying streams?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading from one stream and writing to a different one (e.g. read from a socket, write to a file on disk)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are still methods to directly read/write the RecordBatches, they are just protected access. Possibly extracting out the lower-level methods into a subclass would be useful, so that they could be used directly if wanted but bypassed if not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on having the schema immutable.
On the java side here is we expect the following:
- there's a single writer for a batch.
- once the writer is done writing (usually ending by calling setRowCount()) there can be multiple concurrent readers.
- as one of the goals is controlling how much memory is in use, the RecordBatch acts as an iterator. You load a Record Batch, then work with it, than load the next one once you're done. With fixed width schema, no re-allocation need to happen.
Agreed we could improve the apis around this.
Integration tests still fail for me with the latest changes |
I can try to help with your C++ build, are you using at least gcc 4.8? This part of the codebase doesn't work on Windows yet |
thanks, I'm on a pretty vanilla ubuntu with gcc 5.4.0:
I installed cmake through
|
Looks like we need to add boost installation instructions to the README. here's the package requirements we're using in Travis
should work |
thanks, i've got it running now |
I'm looking at the C++ stream reader. It may not be dealing with a 0 control message properly, is that new? |
It was there before but I'm not sure it was always being sent. Should i just remove it? |
That's the problem then -- @nongli and I discussed having this in the format so it's fine to keep sending it. Let me put up a commit for you to cherry pick |
great, thanks |
see wesm@c2f216a the integration tests still fail, but for a different reason:
this seems like an unloading/loading problem. let me attach the JSON file that causes this issue -- you can see the command lines to run in that output |
ah it's failing on the struct_example.json which is in the repo |
Change-Id: I770e7400d9a4eab32086c0a0f3b92b0a65c8c0e1
Thanks, I'm able to reproduce the issue now |
@wesm I think I found the problem - BitVector wasn't loading itself correctly. Not entirely sure why this wasn't a problem before... |
@elahrvivaz ah, don't worry about the C++ unit tests. I need to make the |
Cool integration tests passing for me locally! |
woot! |
The OS X build timed out, but the rest passed. I'm merging. Thanks a lot for your work on this!! |
awesome, thanks! |
oops there's a java/tools/tmptestfilesio, I'll open a patch to remove |
WIP for comments