From b928309d5099d1a9660440920a1377c5fe48a63e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 5 Jun 2022 13:21:12 -0700 Subject: [PATCH 1/3] Use IPC row count info --- arrow/src/ipc/reader.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs index 868098327092..ccb0dfbb047c 100644 --- a/arrow/src/ipc/reader.rs +++ b/arrow/src/ipc/reader.rs @@ -30,7 +30,7 @@ use crate::compute::cast; use crate::datatypes::{DataType, Field, IntervalUnit, Schema, SchemaRef, UnionMode}; use crate::error::{ArrowError, Result}; use crate::ipc; -use crate::record_batch::{RecordBatch, RecordBatchReader}; +use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader}; use ipc::CONTINUATION_MARKER; use DataType::*; @@ -608,6 +608,11 @@ pub fn read_record_batch( let mut node_index = 0; let mut arrays = vec![]; + let options = RecordBatchOptions { + match_field_names: true, + row_count: Some(batch.length() as usize), + }; + if let Some(projection) = projection { // project fields for (idx, field) in schema.fields().iter().enumerate() { @@ -643,7 +648,11 @@ pub fn read_record_batch( } } - RecordBatch::try_new(Arc::new(schema.project(projection)?), arrays) + RecordBatch::try_new_with_options( + Arc::new(schema.project(projection)?), + arrays, + &options, + ) } else { // keep track of index as lists require more than one node for field in schema.fields() { @@ -661,7 +670,7 @@ pub fn read_record_batch( buffer_index = triple.2; arrays.push(triple.0); } - RecordBatch::try_new(schema, arrays) + RecordBatch::try_new_with_options(schema, arrays, &options) } } From 6b6f148dc36aae7d3ee5f08c5ad4ce0db76797ad Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 5 Jun 2022 13:27:07 -0700 Subject: [PATCH 2/3] Add test --- arrow/src/ipc/reader.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs index ccb0dfbb047c..9100505e041d 100644 --- a/arrow/src/ipc/reader.rs +++ b/arrow/src/ipc/reader.rs @@ -1942,4 +1942,17 @@ mod tests { let output_batch = roundtrip_ipc_stream(&input_batch); assert_eq!(input_batch, output_batch); } + + #[test] + fn test_no_columns_batch() { + let schema = Arc::new(Schema::new(vec![])); + let options = RecordBatchOptions { + match_field_names: true, + row_count: Some(10), + }; + let input_batch = + RecordBatch::try_new_with_options(schema, vec![], &options).unwrap(); + let output_batch = roundtrip_ipc_stream(&input_batch); + assert_eq!(input_batch, output_batch); + } } From 377f7b099f40ad1ebdfe76d3fe9efdb598628fbc Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 6 Jun 2022 09:00:52 -0700 Subject: [PATCH 3/3] Update arrow/src/ipc/reader.rs Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- arrow/src/ipc/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs index 9100505e041d..1ac519382199 100644 --- a/arrow/src/ipc/reader.rs +++ b/arrow/src/ipc/reader.rs @@ -609,8 +609,8 @@ pub fn read_record_batch( let mut arrays = vec![]; let options = RecordBatchOptions { - match_field_names: true, row_count: Some(batch.length() as usize), + ..Default::default() }; if let Some(projection) = projection {