-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update to arrow 32 and Switch to RawDecoder for JSON #5056
Conversation
Switch to RawDecoder for JSON
Ok(futures::stream::iter(reader).boxed()) | ||
} | ||
GetResult::Stream(s) => { | ||
let mut decoder = RawDecoder::try_new(schema, batch_size)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this interface is pretty cool, it avoids needing to scan the byte stream looking for newlines, and consequently should add some additional performance on top of the faster performance of RawDecoder
in general
🥳 🦜 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great @tustvold -- the only question I have is on feeding in empty buffers to the csv reader -- but perhaps I am misreading something
// walk the next level | ||
root_error = source; | ||
// remember the lowest datafusion error so far | ||
if let Some(e) = root_error.downcast_ref::<DataFusionError>() { | ||
last_datafusion_error = e; | ||
} else if let Some(e) = root_error.downcast_ref::<Arc<DataFusionError>>() { | ||
// As `Arc<T>::source()` calls through to `T::source()` we need to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -217,6 +217,9 @@ fn default_field_name(dt: &DataType) -> &str { | |||
DataType::Union(_, _, _) => "union", | |||
DataType::Dictionary(_, _) => "map", | |||
DataType::Map(_, _) => unimplemented!("Map support not implemented"), | |||
DataType::RunEndEncoded(_, _) => { | |||
unimplemented!("RunEndEncoded support not implemented") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -21,7 +21,6 @@ use crate::datasource::file_format::file_type::FileCompressionType; | |||
use crate::error::{DataFusionError, Result}; | |||
use crate::execution::context::{SessionState, TaskContext}; | |||
use crate::physical_plan::expressions::PhysicalSortExpr; | |||
use crate::physical_plan::file_format::delimited_stream::newline_delimited_stream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the corresponding newline_delimited_stream
module be deleted too?
https://github.com/search?q=repo%3Aapache%2Farrow-datafusion%20newline_delimited_stream&type=code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately it is still used by the schema inference logic, I'll see about resurrecting the PR to move to using the upstream implementation
} | ||
let decoded = match decoder.decode(buffered.as_ref()) { | ||
// Note: the decoder needs to be called with an empty | ||
// array to delimt the final record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// array to delimt the final record | |
// array to delimit the final record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I must be missing how the code is called with an empty buffer. If all data in buffered was consumed and then the next poll was empty, won't that break out of the the loop prior to calling decode()
🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are quite correct, I'm investigating how this is working...
@@ -218,7 +218,7 @@ impl TryFrom<&DataType> for protobuf::arrow_type::ArrowTypeEnum { | |||
DataType::Decimal256(_, _) => { | |||
return Err(Error::General("Proto serialization error: The Decimal256 data type is not yet supported".to_owned())) | |||
} | |||
DataType::Map(_, _) => { | |||
DataType::Map(_, _) | DataType::RunEndEncoded(_, _) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend either updating the error message here or adding a separate clause for RunEndEncoded
Benchmark runs are scheduled for baseline = a218b70 and contender = bb699eb. bb699eb is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #.
Rationale for this change
Integration test for apache/arrow-rs#3479 and preparing for the next arrow release apache/arrow-rs#3584
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?