diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 06eb8f79dada..f8e4889f0b7f 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -381,6 +381,7 @@ impl<'a> PruningStatistics for RowGroupPruningStatistics<'a> { self.statistics_converter(column) .and_then(|c| c.row_group_null_counts(self.metadata_iter())) .ok() + .map(|counts| Arc::new(counts) as ArrayRef) } fn row_counts(&self, _column: &Column) -> Option { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index a2f17ca9b7a7..14d7bc2af42d 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -661,7 +661,7 @@ where /// of parquet page [`Index`]'es to an [`ArrayRef`] /// /// The returned Array is an [`UInt64Array`] -pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result +pub(crate) fn null_counts_page_statistics<'a, I>(iterator: I) -> Result where I: Iterator, { @@ -680,7 +680,7 @@ where _ => unimplemented!(), }); - Ok(Arc::new(UInt64Array::from_iter(iter))) + Ok(UInt64Array::from_iter(iter)) } /// Extracts Parquet statistics as Arrow arrays @@ -874,21 +874,22 @@ impl<'a> StatisticsConverter<'a> { /// Extract the null counts from row group statistics in [`RowGroupMetaData`] /// /// See docs on [`Self::row_group_mins`] for details - pub fn row_group_null_counts(&self, metadatas: I) -> Result + pub fn row_group_null_counts(&self, metadatas: I) -> Result where I: IntoIterator, { - let data_type = self.arrow_field.data_type(); - let Some(parquet_index) = self.parquet_index else { - return Ok(self.make_null_array(data_type, metadatas)); + let num_row_groups = metadatas.into_iter().count(); + return Ok(UInt64Array::from_iter( + std::iter::repeat(None).take(num_row_groups), + )); }; let null_counts = metadatas .into_iter() .map(|x| x.column(parquet_index).statistics()) .map(|s| s.map(|s| s.null_count())); - Ok(Arc::new(UInt64Array::from_iter(null_counts))) + Ok(UInt64Array::from_iter(null_counts)) } /// Extract the minimum values from Data Page statistics. @@ -1007,14 +1008,15 @@ impl<'a> StatisticsConverter<'a> { column_page_index: &ParquetColumnIndex, column_offset_index: &ParquetOffsetIndex, row_group_indices: I, - ) -> Result + ) -> Result where I: IntoIterator, { - let data_type = self.arrow_field.data_type(); - let Some(parquet_index) = self.parquet_index else { - return Ok(self.make_null_array(data_type, row_group_indices)); + let num_row_groups = row_group_indices.into_iter().count(); + return Ok(UInt64Array::from_iter( + std::iter::repeat(None).take(num_row_groups), + )); }; let iter = row_group_indices.into_iter().map(|rg_index| { @@ -1047,21 +1049,19 @@ impl<'a> StatisticsConverter<'a> { pub fn data_page_row_counts( &self, column_offset_index: &ParquetOffsetIndex, - row_group_metadatas: &[RowGroupMetaData], + row_group_metadatas: &'a [RowGroupMetaData], row_group_indices: I, - ) -> Result + ) -> Result> where I: IntoIterator, { - let data_type = self.arrow_field.data_type(); - let Some(parquet_index) = self.parquet_index else { - return Ok(self.make_null_array(data_type, row_group_indices)); + // no matching column found in parquet_index; + // thus we cannot extract page_locations in order to determine + // the row count on a per DataPage basis. + return Ok(None); }; - // `offset_index[row_group_number][column_number][page_number]` holds - // the [`PageLocation`] corresponding to page `page_number` of column - // `column_number`of row group `row_group_number`. let mut row_count_total = Vec::new(); for rg_idx in row_group_indices { let page_locations = &column_offset_index[*rg_idx][parquet_index]; @@ -1070,9 +1070,8 @@ impl<'a> StatisticsConverter<'a> { Some(loc[1].first_row_index as u64 - loc[0].first_row_index as u64) }); - let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows(); - // append the last page row count + let num_rows_in_row_group = &row_group_metadatas[*rg_idx].num_rows(); let row_count_per_page = row_count_per_page .chain(std::iter::once(Some( *num_rows_in_row_group as u64 @@ -1083,7 +1082,7 @@ impl<'a> StatisticsConverter<'a> { row_count_total.extend(row_count_per_page); } - Ok(Arc::new(UInt64Array::from_iter(row_count_total))) + Ok(Some(UInt64Array::from_iter(row_count_total))) } /// Returns a null array of data_type with one element per row group diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 87bd1372225f..cd0efc8d3525 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -201,7 +201,7 @@ struct Test<'a> { expected_min: ArrayRef, expected_max: ArrayRef, expected_null_counts: UInt64Array, - expected_row_counts: UInt64Array, + expected_row_counts: Option, /// Which column to extract statistics from column_name: &'static str, /// What statistics should be checked? @@ -210,6 +210,28 @@ struct Test<'a> { impl<'a> Test<'a> { fn run(self) { + let converter = StatisticsConverter::try_new( + self.column_name, + self.reader.schema(), + self.reader.parquet_schema(), + ) + .unwrap(); + + self.run_checks(converter); + } + + fn run_with_schema(self, schema: &Schema) { + let converter = StatisticsConverter::try_new( + self.column_name, + schema, + self.reader.parquet_schema(), + ) + .unwrap(); + + self.run_checks(converter); + } + + fn run_checks(self, converter: StatisticsConverter) { let Self { reader, expected_min, @@ -220,15 +242,7 @@ impl<'a> Test<'a> { check, } = self; - let converter = StatisticsConverter::try_new( - column_name, - reader.schema(), - reader.parquet_schema(), - ) - .unwrap(); - let row_groups = reader.metadata().row_groups(); - let expected_null_counts = Arc::new(expected_null_counts) as ArrayRef; if check.data_page() { let column_page_index = reader @@ -241,11 +255,7 @@ impl<'a> Test<'a> { .offset_index() .expect("File should have column offset indices"); - let row_group_indices = row_groups - .iter() - .enumerate() - .map(|(i, _)| i) - .collect::>(); + let row_group_indices: Vec<_> = (0..row_groups.len()).collect(); let min = converter .data_page_mins( @@ -288,10 +298,8 @@ impl<'a> Test<'a> { let row_counts = converter .data_page_row_counts(column_offset_index, row_groups, &row_group_indices) .unwrap(); - // https://github.com/apache/datafusion/issues/10926 - let expected_row_counts: ArrayRef = Arc::new(expected_row_counts.clone()); assert_eq!( - &row_counts, &expected_row_counts, + row_counts, expected_row_counts, "{column_name}: Mismatch with expected row counts. \ Actual: {row_counts:?}. Expected: {expected_row_counts:?}" ); @@ -321,6 +329,7 @@ impl<'a> Test<'a> { reader.metadata().row_groups().iter(), ) .unwrap(); + let row_counts = Some(row_counts); assert_eq!( row_counts, expected_row_counts, "{column_name}: Mismatch with expected row counts. \ @@ -377,7 +386,7 @@ async fn test_one_row_group_without_null() { // no nulls expected_null_counts: UInt64Array::from(vec![0]), // 3 rows - expected_row_counts: UInt64Array::from(vec![3]), + expected_row_counts: Some(UInt64Array::from(vec![3])), column_name: "i64", check: Check::RowGroup, } @@ -404,7 +413,7 @@ async fn test_one_row_group_with_null_and_negative() { // 2 nulls expected_null_counts: UInt64Array::from(vec![2]), // 8 rows - expected_row_counts: UInt64Array::from(vec![8]), + expected_row_counts: Some(UInt64Array::from(vec![8])), column_name: "i64", check: Check::RowGroup, } @@ -431,7 +440,7 @@ async fn test_two_row_group_with_null() { // nulls are [0, 2] expected_null_counts: UInt64Array::from(vec![0, 2]), // row counts are [10, 5] - expected_row_counts: UInt64Array::from(vec![10, 5]), + expected_row_counts: Some(UInt64Array::from(vec![10, 5])), column_name: "i64", check: Check::RowGroup, } @@ -458,7 +467,7 @@ async fn test_two_row_groups_with_all_nulls_in_one() { // nulls are [1, 3] expected_null_counts: UInt64Array::from(vec![1, 3]), // row counts are [5, 3] - expected_row_counts: UInt64Array::from(vec![5, 3]), + expected_row_counts: Some(UInt64Array::from(vec![5, 3])), column_name: "i64", check: Check::RowGroup, } @@ -489,7 +498,7 @@ async fn test_multiple_data_pages_nulls_and_negatives() { expected_min: Arc::new(Int64Array::from(vec![Some(-1), Some(3), Some(7), None])), expected_max: Arc::new(Int64Array::from(vec![Some(2), Some(6), Some(9), None])), expected_null_counts: UInt64Array::from(vec![0, 0, 1, 2]), - expected_row_counts: UInt64Array::from(vec![4, 4, 4, 2]), + expected_row_counts: Some(UInt64Array::from(vec![4, 4, 4, 2])), column_name: "i64", check: Check::DataPage, } @@ -522,7 +531,7 @@ async fn test_int_64() { // nulls are [0, 0, 0, 0] expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // row counts are [5, 5, 5, 5] - expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "i64", check: Check::Both, } @@ -548,7 +557,7 @@ async fn test_int_32() { // nulls are [0, 0, 0, 0] expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // row counts are [5, 5, 5, 5] - expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "i32", check: Check::Both, } @@ -574,7 +583,7 @@ async fn test_int_16() { // nulls are [0, 0, 0, 0] expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // row counts are [5, 5, 5, 5] - expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "i16", check: Check::Both, } @@ -600,7 +609,7 @@ async fn test_int_8() { // nulls are [0, 0, 0, 0] expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // row counts are [5, 5, 5, 5] - expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "i8", check: Check::Both, } @@ -650,7 +659,7 @@ async fn test_timestamp() { // nulls are [1, 1, 1, 1] expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), // row counts are [5, 5, 5, 5] - expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "nanos", check: Check::RowGroup, } @@ -679,7 +688,7 @@ async fn test_timestamp() { // nulls are [1, 1, 1, 1] expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), // row counts are [5, 5, 5, 5] - expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "nanos_timezoned", check: Check::RowGroup, } @@ -701,7 +710,7 @@ async fn test_timestamp() { TimestampMicrosecondType::parse("2020-01-12T01:01:01"), ])), expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), - expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "micros", check: Check::RowGroup, } @@ -730,7 +739,7 @@ async fn test_timestamp() { // nulls are [1, 1, 1, 1] expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), // row counts are [5, 5, 5, 5] - expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "micros_timezoned", check: Check::RowGroup, } @@ -752,7 +761,7 @@ async fn test_timestamp() { TimestampMillisecondType::parse("2020-01-12T01:01:01"), ])), expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), - expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "millis", check: Check::RowGroup, } @@ -781,7 +790,7 @@ async fn test_timestamp() { // nulls are [1, 1, 1, 1] expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), // row counts are [5, 5, 5, 5] - expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "millis_timezoned", check: Check::RowGroup, } @@ -803,7 +812,7 @@ async fn test_timestamp() { TimestampSecondType::parse("2020-01-12T01:01:01"), ])), expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), - expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "seconds", check: Check::RowGroup, } @@ -832,7 +841,7 @@ async fn test_timestamp() { // nulls are [1, 1, 1, 1] expected_null_counts: UInt64Array::from(vec![1, 1, 1, 1]), // row counts are [5, 5, 5, 5] - expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "seconds_timezoned", check: Check::RowGroup, } @@ -878,7 +887,7 @@ async fn test_timestamp_diff_rg_sizes() { // nulls are [1, 2, 1] expected_null_counts: UInt64Array::from(vec![1, 2, 1]), // row counts are [8, 8, 4] - expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])), column_name: "nanos", check: Check::RowGroup, } @@ -905,7 +914,7 @@ async fn test_timestamp_diff_rg_sizes() { // nulls are [1, 2, 1] expected_null_counts: UInt64Array::from(vec![1, 2, 1]), // row counts are [8, 8, 4] - expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])), column_name: "nanos_timezoned", check: Check::RowGroup, } @@ -925,7 +934,7 @@ async fn test_timestamp_diff_rg_sizes() { TimestampMicrosecondType::parse("2020-01-12T01:01:01"), ])), expected_null_counts: UInt64Array::from(vec![1, 2, 1]), - expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])), column_name: "micros", check: Check::RowGroup, } @@ -952,7 +961,7 @@ async fn test_timestamp_diff_rg_sizes() { // nulls are [1, 2, 1] expected_null_counts: UInt64Array::from(vec![1, 2, 1]), // row counts are [8, 8, 4] - expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])), column_name: "micros_timezoned", check: Check::RowGroup, } @@ -972,7 +981,7 @@ async fn test_timestamp_diff_rg_sizes() { TimestampMillisecondType::parse("2020-01-12T01:01:01"), ])), expected_null_counts: UInt64Array::from(vec![1, 2, 1]), - expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])), column_name: "millis", check: Check::RowGroup, } @@ -999,7 +1008,7 @@ async fn test_timestamp_diff_rg_sizes() { // nulls are [1, 2, 1] expected_null_counts: UInt64Array::from(vec![1, 2, 1]), // row counts are [8, 8, 4] - expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])), column_name: "millis_timezoned", check: Check::RowGroup, } @@ -1019,7 +1028,7 @@ async fn test_timestamp_diff_rg_sizes() { TimestampSecondType::parse("2020-01-12T01:01:01"), ])), expected_null_counts: UInt64Array::from(vec![1, 2, 1]), - expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])), column_name: "seconds", check: Check::RowGroup, } @@ -1046,7 +1055,7 @@ async fn test_timestamp_diff_rg_sizes() { // nulls are [1, 2, 1] expected_null_counts: UInt64Array::from(vec![1, 2, 1]), // row counts are [8, 8, 4] - expected_row_counts: UInt64Array::from(vec![8, 8, 4]), + expected_row_counts: Some(UInt64Array::from(vec![8, 8, 4])), column_name: "seconds_timezoned", check: Check::RowGroup, } @@ -1084,7 +1093,7 @@ async fn test_dates_32_diff_rg_sizes() { // nulls are [2, 2] expected_null_counts: UInt64Array::from(vec![2, 2]), // row counts are [13, 7] - expected_row_counts: UInt64Array::from(vec![13, 7]), + expected_row_counts: Some(UInt64Array::from(vec![13, 7])), column_name: "date32", check: Check::RowGroup, } @@ -1107,7 +1116,7 @@ async fn test_time32_second_diff_rg_sizes() { expected_min: Arc::new(Time32SecondArray::from(vec![18506, 18510, 18514, 18518])), expected_max: Arc::new(Time32SecondArray::from(vec![18509, 18513, 18517, 18521])), expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity - expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]), + expected_row_counts: Some(UInt64Array::from(vec![4, 4, 4, 4])), column_name: "second", check: Check::RowGroup, } @@ -1134,7 +1143,7 @@ async fn test_time32_millisecond_diff_rg_sizes() { 3600003, 3600007, 3600011, 3600015, ])), expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity - expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]), + expected_row_counts: Some(UInt64Array::from(vec![4, 4, 4, 4])), column_name: "millisecond", check: Check::RowGroup, } @@ -1167,7 +1176,7 @@ async fn test_time64_microsecond_diff_rg_sizes() { 1234567890138, ])), expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity - expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]), + expected_row_counts: Some(UInt64Array::from(vec![4, 4, 4, 4])), column_name: "microsecond", check: Check::RowGroup, } @@ -1200,7 +1209,7 @@ async fn test_time64_nanosecond_diff_rg_sizes() { 987654321012360, ])), expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), // Assuming 1 null per row group for simplicity - expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4]), + expected_row_counts: Some(UInt64Array::from(vec![4, 4, 4, 4])), column_name: "nanosecond", check: Check::RowGroup, } @@ -1227,7 +1236,7 @@ async fn test_dates_64_diff_rg_sizes() { Date64Type::parse("2029-11-12"), ])), expected_null_counts: UInt64Array::from(vec![2, 2]), - expected_row_counts: UInt64Array::from(vec![13, 7]), + expected_row_counts: Some(UInt64Array::from(vec![13, 7])), column_name: "date64", check: Check::RowGroup, } @@ -1255,7 +1264,7 @@ async fn test_uint() { expected_min: Arc::new(UInt8Array::from(vec![0, 1, 4, 7, 251])), expected_max: Arc::new(UInt8Array::from(vec![3, 4, 6, 250, 254])), expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), - expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), + expected_row_counts: Some(UInt64Array::from(vec![4, 4, 4, 4, 4])), column_name: "u8", check: Check::RowGroup, } @@ -1266,7 +1275,7 @@ async fn test_uint() { expected_min: Arc::new(UInt16Array::from(vec![0, 1, 4, 7, 251])), expected_max: Arc::new(UInt16Array::from(vec![3, 4, 6, 250, 254])), expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), - expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), + expected_row_counts: Some(UInt64Array::from(vec![4, 4, 4, 4, 4])), column_name: "u16", check: Check::RowGroup, } @@ -1277,7 +1286,7 @@ async fn test_uint() { expected_min: Arc::new(UInt32Array::from(vec![0, 1, 4, 7, 251])), expected_max: Arc::new(UInt32Array::from(vec![3, 4, 6, 250, 254])), expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), - expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), + expected_row_counts: Some(UInt64Array::from(vec![4, 4, 4, 4, 4])), column_name: "u32", check: Check::RowGroup, } @@ -1288,7 +1297,7 @@ async fn test_uint() { expected_min: Arc::new(UInt64Array::from(vec![0, 1, 4, 7, 251])), expected_max: Arc::new(UInt64Array::from(vec![3, 4, 6, 250, 254])), expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0, 0]), - expected_row_counts: UInt64Array::from(vec![4, 4, 4, 4, 4]), + expected_row_counts: Some(UInt64Array::from(vec![4, 4, 4, 4, 4])), column_name: "u64", check: Check::RowGroup, } @@ -1311,7 +1320,7 @@ async fn test_int32_range() { expected_min: Arc::new(Int32Array::from(vec![0])), expected_max: Arc::new(Int32Array::from(vec![300000])), expected_null_counts: UInt64Array::from(vec![0]), - expected_row_counts: UInt64Array::from(vec![4]), + expected_row_counts: Some(UInt64Array::from(vec![4])), column_name: "i", check: Check::RowGroup, } @@ -1334,7 +1343,7 @@ async fn test_uint32_range() { expected_min: Arc::new(UInt32Array::from(vec![0])), expected_max: Arc::new(UInt32Array::from(vec![300000])), expected_null_counts: UInt64Array::from(vec![0]), - expected_row_counts: UInt64Array::from(vec![4]), + expected_row_counts: Some(UInt64Array::from(vec![4])), column_name: "u", check: Check::RowGroup, } @@ -1356,7 +1365,7 @@ async fn test_numeric_limits_unsigned() { expected_min: Arc::new(UInt8Array::from(vec![u8::MIN, 100])), expected_max: Arc::new(UInt8Array::from(vec![100, u8::MAX])), expected_null_counts: UInt64Array::from(vec![0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 2]), + expected_row_counts: Some(UInt64Array::from(vec![5, 2])), column_name: "u8", check: Check::RowGroup, } @@ -1367,7 +1376,7 @@ async fn test_numeric_limits_unsigned() { expected_min: Arc::new(UInt16Array::from(vec![u16::MIN, 100])), expected_max: Arc::new(UInt16Array::from(vec![100, u16::MAX])), expected_null_counts: UInt64Array::from(vec![0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 2]), + expected_row_counts: Some(UInt64Array::from(vec![5, 2])), column_name: "u16", check: Check::RowGroup, } @@ -1378,7 +1387,7 @@ async fn test_numeric_limits_unsigned() { expected_min: Arc::new(UInt32Array::from(vec![u32::MIN, 100])), expected_max: Arc::new(UInt32Array::from(vec![100, u32::MAX])), expected_null_counts: UInt64Array::from(vec![0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 2]), + expected_row_counts: Some(UInt64Array::from(vec![5, 2])), column_name: "u32", check: Check::RowGroup, } @@ -1389,7 +1398,7 @@ async fn test_numeric_limits_unsigned() { expected_min: Arc::new(UInt64Array::from(vec![u64::MIN, 100])), expected_max: Arc::new(UInt64Array::from(vec![100, u64::MAX])), expected_null_counts: UInt64Array::from(vec![0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 2]), + expected_row_counts: Some(UInt64Array::from(vec![5, 2])), column_name: "u64", check: Check::RowGroup, } @@ -1411,7 +1420,7 @@ async fn test_numeric_limits_signed() { expected_min: Arc::new(Int8Array::from(vec![i8::MIN, -100])), expected_max: Arc::new(Int8Array::from(vec![100, i8::MAX])), expected_null_counts: UInt64Array::from(vec![0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 2]), + expected_row_counts: Some(UInt64Array::from(vec![5, 2])), column_name: "i8", check: Check::RowGroup, } @@ -1422,7 +1431,7 @@ async fn test_numeric_limits_signed() { expected_min: Arc::new(Int16Array::from(vec![i16::MIN, -100])), expected_max: Arc::new(Int16Array::from(vec![100, i16::MAX])), expected_null_counts: UInt64Array::from(vec![0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 2]), + expected_row_counts: Some(UInt64Array::from(vec![5, 2])), column_name: "i16", check: Check::RowGroup, } @@ -1433,7 +1442,7 @@ async fn test_numeric_limits_signed() { expected_min: Arc::new(Int32Array::from(vec![i32::MIN, -100])), expected_max: Arc::new(Int32Array::from(vec![100, i32::MAX])), expected_null_counts: UInt64Array::from(vec![0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 2]), + expected_row_counts: Some(UInt64Array::from(vec![5, 2])), column_name: "i32", check: Check::RowGroup, } @@ -1444,7 +1453,7 @@ async fn test_numeric_limits_signed() { expected_min: Arc::new(Int64Array::from(vec![i64::MIN, -100])), expected_max: Arc::new(Int64Array::from(vec![100, i64::MAX])), expected_null_counts: UInt64Array::from(vec![0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 2]), + expected_row_counts: Some(UInt64Array::from(vec![5, 2])), column_name: "i64", check: Check::RowGroup, } @@ -1466,7 +1475,7 @@ async fn test_numeric_limits_float() { expected_min: Arc::new(Float32Array::from(vec![f32::MIN, -100.0])), expected_max: Arc::new(Float32Array::from(vec![100.0, f32::MAX])), expected_null_counts: UInt64Array::from(vec![0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 2]), + expected_row_counts: Some(UInt64Array::from(vec![5, 2])), column_name: "f32", check: Check::RowGroup, } @@ -1477,7 +1486,7 @@ async fn test_numeric_limits_float() { expected_min: Arc::new(Float64Array::from(vec![f64::MIN, -100.0])), expected_max: Arc::new(Float64Array::from(vec![100.0, f64::MAX])), expected_null_counts: UInt64Array::from(vec![0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 2]), + expected_row_counts: Some(UInt64Array::from(vec![5, 2])), column_name: "f64", check: Check::RowGroup, } @@ -1488,7 +1497,7 @@ async fn test_numeric_limits_float() { expected_min: Arc::new(Float32Array::from(vec![-1.0, -100.0])), expected_max: Arc::new(Float32Array::from(vec![100.0, -100.0])), expected_null_counts: UInt64Array::from(vec![0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 2]), + expected_row_counts: Some(UInt64Array::from(vec![5, 2])), column_name: "f32_nan", check: Check::RowGroup, } @@ -1499,7 +1508,7 @@ async fn test_numeric_limits_float() { expected_min: Arc::new(Float64Array::from(vec![-1.0, -100.0])), expected_max: Arc::new(Float64Array::from(vec![100.0, -100.0])), expected_null_counts: UInt64Array::from(vec![0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 2]), + expected_row_counts: Some(UInt64Array::from(vec![5, 2])), column_name: "f64_nan", check: Check::RowGroup, } @@ -1522,7 +1531,7 @@ async fn test_float64() { expected_min: Arc::new(Float64Array::from(vec![-5.0, -4.0, -0.0, 5.0])), expected_max: Arc::new(Float64Array::from(vec![-1.0, 0.0, 4.0, 9.0])), expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "f", check: Check::RowGroup, } @@ -1555,7 +1564,7 @@ async fn test_float16() { .collect::>(), )), expected_null_counts: UInt64Array::from(vec![0, 0, 0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), column_name: "f", check: Check::RowGroup, } @@ -1586,7 +1595,7 @@ async fn test_decimal() { .unwrap(), ), expected_null_counts: UInt64Array::from(vec![0, 0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])), column_name: "decimal_col", check: Check::RowGroup, } @@ -1624,7 +1633,7 @@ async fn test_decimal_256() { .unwrap(), ), expected_null_counts: UInt64Array::from(vec![0, 0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])), column_name: "decimal256_col", check: Check::RowGroup, } @@ -1644,7 +1653,7 @@ async fn test_dictionary() { expected_min: Arc::new(StringArray::from(vec!["abc", "aaa"])), expected_max: Arc::new(StringArray::from(vec!["def", "fffff"])), expected_null_counts: UInt64Array::from(vec![1, 0]), - expected_row_counts: UInt64Array::from(vec![5, 2]), + expected_row_counts: Some(UInt64Array::from(vec![5, 2])), column_name: "string_dict_i8", check: Check::RowGroup, } @@ -1655,7 +1664,7 @@ async fn test_dictionary() { expected_min: Arc::new(StringArray::from(vec!["abc", "aaa"])), expected_max: Arc::new(StringArray::from(vec!["def", "fffff"])), expected_null_counts: UInt64Array::from(vec![1, 0]), - expected_row_counts: UInt64Array::from(vec![5, 2]), + expected_row_counts: Some(UInt64Array::from(vec![5, 2])), column_name: "string_dict_i32", check: Check::RowGroup, } @@ -1666,7 +1675,7 @@ async fn test_dictionary() { expected_min: Arc::new(Int64Array::from(vec![-100, 0])), expected_max: Arc::new(Int64Array::from(vec![0, 100])), expected_null_counts: UInt64Array::from(vec![1, 0]), - expected_row_counts: UInt64Array::from(vec![5, 2]), + expected_row_counts: Some(UInt64Array::from(vec![5, 2])), column_name: "int_dict_i8", check: Check::RowGroup, } @@ -1704,7 +1713,7 @@ async fn test_byte() { "all backends", ])), expected_null_counts: UInt64Array::from(vec![0, 0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])), column_name: "name", check: Check::RowGroup, } @@ -1724,7 +1733,7 @@ async fn test_byte() { "backend six", ])), expected_null_counts: UInt64Array::from(vec![0, 0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])), column_name: "service_string", check: Check::RowGroup, } @@ -1743,7 +1752,7 @@ async fn test_byte() { expected_min: Arc::new(BinaryArray::from(expected_service_binary_min_values)), expected_max: Arc::new(BinaryArray::from(expected_service_binary_max_values)), expected_null_counts: UInt64Array::from(vec![0, 0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])), column_name: "service_binary", check: Check::RowGroup, } @@ -1764,7 +1773,7 @@ async fn test_byte() { FixedSizeBinaryArray::try_from_iter(max_input.into_iter()).unwrap(), ), expected_null_counts: UInt64Array::from(vec![0, 0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])), column_name: "service_fixedsize", check: Check::RowGroup, } @@ -1785,7 +1794,7 @@ async fn test_byte() { expected_service_large_binary_max_values, )), expected_null_counts: UInt64Array::from(vec![0, 0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])), column_name: "service_large_binary", check: Check::RowGroup, } @@ -1818,7 +1827,7 @@ async fn test_period_in_column_names() { "HTTP GET / DISPATCH", ])), expected_null_counts: UInt64Array::from(vec![0, 0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])), column_name: "name", check: Check::RowGroup, } @@ -1832,7 +1841,7 @@ async fn test_period_in_column_names() { "frontend", "frontend", "backend", ])), expected_null_counts: UInt64Array::from(vec![0, 0, 0]), - expected_row_counts: UInt64Array::from(vec![5, 5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5])), column_name: "service.name", check: Check::RowGroup, } @@ -1856,7 +1865,7 @@ async fn test_boolean() { expected_min: Arc::new(BooleanArray::from(vec![false, false])), expected_max: Arc::new(BooleanArray::from(vec![true, false])), expected_null_counts: UInt64Array::from(vec![1, 0]), - expected_row_counts: UInt64Array::from(vec![5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5])), column_name: "bool", check: Check::RowGroup, } @@ -1883,7 +1892,7 @@ async fn test_struct() { expected_min: Arc::new(struct_array(vec![(Some(1), Some(6.0), Some(12.0))])), expected_max: Arc::new(struct_array(vec![(Some(2), Some(8.5), Some(14.0))])), expected_null_counts: UInt64Array::from(vec![0]), - expected_row_counts: UInt64Array::from(vec![3]), + expected_row_counts: Some(UInt64Array::from(vec![3])), column_name: "struct", check: Check::RowGroup, } @@ -1906,7 +1915,7 @@ async fn test_utf8() { expected_min: Arc::new(StringArray::from(vec!["a", "e"])), expected_max: Arc::new(StringArray::from(vec!["d", "i"])), expected_null_counts: UInt64Array::from(vec![1, 0]), - expected_row_counts: UInt64Array::from(vec![5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5])), column_name: "utf8", check: Check::RowGroup, } @@ -1918,7 +1927,7 @@ async fn test_utf8() { expected_min: Arc::new(LargeStringArray::from(vec!["a", "e"])), expected_max: Arc::new(LargeStringArray::from(vec!["d", "i"])), expected_null_counts: UInt64Array::from(vec![1, 0]), - expected_row_counts: UInt64Array::from(vec![5, 5]), + expected_row_counts: Some(UInt64Array::from(vec![5, 5])), column_name: "large_utf8", check: Check::RowGroup, } @@ -1944,7 +1953,7 @@ async fn test_missing_statistics() { expected_min: Arc::new(Int64Array::from(vec![None])), expected_max: Arc::new(Int64Array::from(vec![None])), expected_null_counts: UInt64Array::from(vec![None]), - expected_row_counts: UInt64Array::from(vec![3]), // stil has row count statistics + expected_row_counts: Some(UInt64Array::from(vec![3])), // stil has row count statistics column_name: "i64", check: Check::RowGroup, } @@ -1966,9 +1975,59 @@ async fn test_column_not_found() { expected_min: Arc::new(Int64Array::from(vec![18262, 18565])), expected_max: Arc::new(Int64Array::from(vec![18564, 21865])), expected_null_counts: UInt64Array::from(vec![2, 2]), - expected_row_counts: UInt64Array::from(vec![13, 7]), + expected_row_counts: Some(UInt64Array::from(vec![13, 7])), column_name: "not_a_column", check: Check::RowGroup, } .run_col_not_found(); } + +#[tokio::test] +async fn test_column_non_existent() { + // Create a schema with an additional column + // that will not have a matching parquet index + let schema = Arc::new(Schema::new(vec![ + Field::new("i8", DataType::Int8, true), + Field::new("i16", DataType::Int16, true), + Field::new("i32", DataType::Int32, true), + Field::new("i64", DataType::Int64, true), + Field::new("i_do_not_exist", DataType::Int64, true), + ])); + + let reader = TestReader { + scenario: Scenario::Int, + row_per_group: 5, + } + .build() + .await; + + Test { + reader: &reader, + // mins are [-5, -4, 0, 5] + expected_min: Arc::new(Int64Array::from(vec![None, None, None, None])), + // maxes are [-1, 0, 4, 9] + expected_max: Arc::new(Int64Array::from(vec![None, None, None, None])), + // nulls are [0, 0, 0, 0] + expected_null_counts: UInt64Array::from(vec![None, None, None, None]), + // row counts are [5, 5, 5, 5] + expected_row_counts: Some(UInt64Array::from(vec![5, 5, 5, 5])), + column_name: "i_do_not_exist", + check: Check::RowGroup, + } + .run_with_schema(&schema); + + Test { + reader: &reader, + // mins are [-5, -4, 0, 5] + expected_min: Arc::new(Int64Array::from(vec![None, None, None, None])), + // maxes are [-1, 0, 4, 9] + expected_max: Arc::new(Int64Array::from(vec![None, None, None, None])), + // nulls are [0, 0, 0, 0] + expected_null_counts: UInt64Array::from(vec![None, None, None, None]), + // row counts are [5, 5, 5, 5] + expected_row_counts: None, + column_name: "i_do_not_exist", + check: Check::DataPage, + } + .run_with_schema(&schema); +}