From d6ddcb2cd6764c92c77c0cc61f0ac76c21d9c83f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 3 Feb 2022 17:01:49 -0500 Subject: [PATCH] Add partitioned_csv setup code to sql_integration test --- datafusion/src/execution/context.rs | 239 +----------------------- datafusion/tests/sql/mod.rs | 1 + datafusion/tests/sql/partitioned_csv.rs | 95 ++++++++++ datafusion/tests/sql/projection.rs | 192 +++++++++++++++++++ datafusion/tests/sql/select.rs | 59 +++++- 5 files changed, 347 insertions(+), 239 deletions(-) create mode 100644 datafusion/tests/sql/partitioned_csv.rs diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index fb271a1a7e56..96e49c800f48 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -1281,11 +1281,9 @@ mod tests { use super::*; use crate::execution::context::QueryPlanner; use crate::from_slice::FromSlice; - use crate::logical_plan::plan::Projection; - use crate::logical_plan::TableScan; use crate::logical_plan::{binary_expr, lit, Operator}; + use crate::physical_plan::collect; use crate::physical_plan::functions::{make_scalar_function, Volatility}; - use crate::physical_plan::{collect, collect_partitioned}; use crate::test; use crate::variable::VarType; use crate::{ @@ -1311,7 +1309,6 @@ mod tests { use std::thread::{self, JoinHandle}; use std::{io::prelude::*, sync::Mutex}; use tempfile::TempDir; - use test::*; #[tokio::test] async fn shared_memory_and_disk_manager() { @@ -1347,62 +1344,6 @@ mod tests { )); } - #[tokio::test] - async fn parallel_projection() -> Result<()> { - let partition_count = 4; - let results = execute("SELECT c1, c2 FROM test", partition_count).await?; - - let expected = vec![ - "+----+----+", - "| c1 | c2 |", - "+----+----+", - "| 3 | 1 |", - "| 3 | 2 |", - "| 3 | 3 |", - "| 3 | 4 |", - "| 3 | 5 |", - "| 3 | 6 |", - "| 3 | 7 |", - "| 3 | 8 |", - "| 3 | 9 |", - "| 3 | 10 |", - "| 2 | 1 |", - "| 2 | 2 |", - "| 2 | 3 |", - "| 2 | 4 |", - "| 2 | 5 |", - "| 2 | 6 |", - "| 2 | 7 |", - "| 2 | 8 |", - "| 2 | 9 |", - "| 2 | 10 |", - "| 1 | 1 |", - "| 1 | 2 |", - "| 1 | 3 |", - "| 1 | 4 |", - "| 1 | 5 |", - "| 1 | 6 |", - "| 1 | 7 |", - "| 1 | 8 |", - "| 1 | 9 |", - "| 1 | 10 |", - "| 0 | 1 |", - "| 0 | 2 |", - "| 0 | 3 |", - "| 0 | 4 |", - "| 0 | 5 |", - "| 0 | 6 |", - "| 0 | 7 |", - "| 0 | 8 |", - "| 0 | 9 |", - "| 0 | 10 |", - "+----+----+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) - } - #[tokio::test] async fn create_variable_expr() -> Result<()> { let tmp_dir = TempDir::new()?; @@ -1447,184 +1388,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn parallel_query_with_filter() -> Result<()> { - let tmp_dir = TempDir::new()?; - let partition_count = 4; - let ctx = create_ctx(&tmp_dir, partition_count).await?; - - let logical_plan = - ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")?; - let logical_plan = ctx.optimize(&logical_plan)?; - - let physical_plan = ctx.create_physical_plan(&logical_plan).await?; - - let runtime = ctx.state.lock().runtime_env.clone(); - let results = collect_partitioned(physical_plan, runtime).await?; - - // note that the order of partitions is not deterministic - let mut num_rows = 0; - for partition in &results { - for batch in partition { - num_rows += batch.num_rows(); - } - } - assert_eq!(20, num_rows); - - let results: Vec = results.into_iter().flatten().collect(); - let expected = vec![ - "+----+----+", - "| c1 | c2 |", - "+----+----+", - "| 1 | 1 |", - "| 1 | 10 |", - "| 1 | 2 |", - "| 1 | 3 |", - "| 1 | 4 |", - "| 1 | 5 |", - "| 1 | 6 |", - "| 1 | 7 |", - "| 1 | 8 |", - "| 1 | 9 |", - "| 2 | 1 |", - "| 2 | 10 |", - "| 2 | 2 |", - "| 2 | 3 |", - "| 2 | 4 |", - "| 2 | 5 |", - "| 2 | 6 |", - "| 2 | 7 |", - "| 2 | 8 |", - "| 2 | 9 |", - "+----+----+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) - } - - #[tokio::test] - async fn projection_on_table_scan() -> Result<()> { - let tmp_dir = TempDir::new()?; - let partition_count = 4; - let ctx = create_ctx(&tmp_dir, partition_count).await?; - let runtime = ctx.state.lock().runtime_env.clone(); - - let table = ctx.table("test")?; - let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan()) - .project(vec![col("c2")])? - .build()?; - - let optimized_plan = ctx.optimize(&logical_plan)?; - match &optimized_plan { - LogicalPlan::Projection(Projection { input, .. }) => match &**input { - LogicalPlan::TableScan(TableScan { - source, - projected_schema, - .. - }) => { - assert_eq!(source.schema().fields().len(), 3); - assert_eq!(projected_schema.fields().len(), 1); - } - _ => panic!("input to projection should be TableScan"), - }, - _ => panic!("expect optimized_plan to be projection"), - } - - let expected = "Projection: #test.c2\ - \n TableScan: test projection=Some([1])"; - assert_eq!(format!("{:?}", optimized_plan), expected); - - let physical_plan = ctx.create_physical_plan(&optimized_plan).await?; - - assert_eq!(1, physical_plan.schema().fields().len()); - assert_eq!("c2", physical_plan.schema().field(0).name().as_str()); - - let batches = collect(physical_plan, runtime).await?; - assert_eq!(40, batches.iter().map(|x| x.num_rows()).sum::()); - - Ok(()) - } - - #[tokio::test] - async fn preserve_nullability_on_projection() -> Result<()> { - let tmp_dir = TempDir::new()?; - let ctx = create_ctx(&tmp_dir, 1).await?; - - let schema: Schema = ctx.table("test").unwrap().schema().clone().into(); - assert!(!schema.field_with_name("c1")?.is_nullable()); - - let plan = LogicalPlanBuilder::scan_empty(None, &schema, None)? - .project(vec![col("c1")])? - .build()?; - - let plan = ctx.optimize(&plan)?; - let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?; - assert!(!physical_plan.schema().field_with_name("c1")?.is_nullable()); - Ok(()) - } - - #[tokio::test] - async fn projection_on_memory_scan() -> Result<()> { - let schema = Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Int32, false), - Field::new("c", DataType::Int32, false), - ]); - let schema = SchemaRef::new(schema); - - let partitions = vec![vec![RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(Int32Array::from_slice(&[1, 10, 10, 100])), - Arc::new(Int32Array::from_slice(&[2, 12, 12, 120])), - Arc::new(Int32Array::from_slice(&[3, 12, 12, 120])), - ], - )?]]; - - let plan = LogicalPlanBuilder::scan_memory(partitions, schema, None)? - .project(vec![col("b")])? - .build()?; - assert_fields_eq(&plan, vec!["b"]); - - let ctx = ExecutionContext::new(); - let optimized_plan = ctx.optimize(&plan)?; - match &optimized_plan { - LogicalPlan::Projection(Projection { input, .. }) => match &**input { - LogicalPlan::TableScan(TableScan { - source, - projected_schema, - .. - }) => { - assert_eq!(source.schema().fields().len(), 3); - assert_eq!(projected_schema.fields().len(), 1); - } - _ => panic!("input to projection should be InMemoryScan"), - }, - _ => panic!("expect optimized_plan to be projection"), - } - - let expected = format!( - "Projection: #{}.b\ - \n TableScan: {} projection=Some([1])", - UNNAMED_TABLE, UNNAMED_TABLE - ); - assert_eq!(format!("{:?}", optimized_plan), expected); - - let physical_plan = ctx.create_physical_plan(&optimized_plan).await?; - - assert_eq!(1, physical_plan.schema().fields().len()); - assert_eq!("b", physical_plan.schema().field(0).name().as_str()); - - let runtime = ctx.state.lock().runtime_env.clone(); - let batches = collect(physical_plan, runtime).await?; - assert_eq!(1, batches.len()); - assert_eq!(1, batches[0].num_columns()); - assert_eq!(4, batches[0].num_rows()); - - Ok(()) - } - #[tokio::test] async fn sort() -> Result<()> { let results = diff --git a/datafusion/tests/sql/mod.rs b/datafusion/tests/sql/mod.rs index 95623d45e467..468762ea05bb 100644 --- a/datafusion/tests/sql/mod.rs +++ b/datafusion/tests/sql/mod.rs @@ -98,6 +98,7 @@ pub mod window; mod explain; pub mod information_schema; +mod partitioned_csv; #[cfg_attr(not(feature = "unicode_expressions"), ignore)] pub mod unicode; diff --git a/datafusion/tests/sql/partitioned_csv.rs b/datafusion/tests/sql/partitioned_csv.rs new file mode 100644 index 000000000000..5efc837d5c95 --- /dev/null +++ b/datafusion/tests/sql/partitioned_csv.rs @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utility functions for running with a partitioned csv dataset: + +use std::{io::Write, sync::Arc}; + +use arrow::{ + datatypes::{DataType, Field, Schema, SchemaRef}, + record_batch::RecordBatch, +}; +use datafusion::{ + error::Result, + prelude::{CsvReadOptions, ExecutionConfig, ExecutionContext}, +}; +use tempfile::TempDir; + +/// Execute SQL and return results +async fn plan_and_collect( + ctx: &mut ExecutionContext, + sql: &str, +) -> Result> { + ctx.sql(sql).await?.collect().await +} + +/// Execute SQL and return results +pub async fn execute(sql: &str, partition_count: usize) -> Result> { + let tmp_dir = TempDir::new()?; + let mut ctx = create_ctx(&tmp_dir, partition_count).await?; + plan_and_collect(&mut ctx, sql).await +} + +/// Generate CSV partitions within the supplied directory +fn populate_csv_partitions( + tmp_dir: &TempDir, + partition_count: usize, + file_extension: &str, +) -> Result { + // define schema for data source (csv file) + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::UInt32, false), + Field::new("c2", DataType::UInt64, false), + Field::new("c3", DataType::Boolean, false), + ])); + + // generate a partitioned file + for partition in 0..partition_count { + let filename = format!("partition-{}.{}", partition, file_extension); + let file_path = tmp_dir.path().join(&filename); + let mut file = std::fs::File::create(file_path)?; + + // generate some data + for i in 0..=10 { + let data = format!("{},{},{}\n", partition, i, i % 2 == 0); + file.write_all(data.as_bytes())?; + } + } + + Ok(schema) +} + +/// Generate a partitioned CSV file and register it with an execution context +pub async fn create_ctx( + tmp_dir: &TempDir, + partition_count: usize, +) -> Result { + let mut ctx = + ExecutionContext::with_config(ExecutionConfig::new().with_target_partitions(8)); + + let schema = populate_csv_partitions(tmp_dir, partition_count, ".csv")?; + + // register csv file with the execution context + ctx.register_csv( + "test", + tmp_dir.path().to_str().unwrap(), + CsvReadOptions::new().schema(&schema), + ) + .await?; + + Ok(ctx) +} diff --git a/datafusion/tests/sql/projection.rs b/datafusion/tests/sql/projection.rs index 57fa598bb754..0a956a9411eb 100644 --- a/datafusion/tests/sql/projection.rs +++ b/datafusion/tests/sql/projection.rs @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +use datafusion::logical_plan::{LogicalPlanBuilder, UNNAMED_TABLE}; +use tempfile::TempDir; + use super::*; #[tokio::test] @@ -73,3 +76,192 @@ async fn csv_query_group_by_avg_with_projection() -> Result<()> { assert_batches_sorted_eq!(expected, &actual); Ok(()) } + +#[tokio::test] +async fn parallel_projection() -> Result<()> { + let partition_count = 4; + let results = + partitioned_csv::execute("SELECT c1, c2 FROM test", partition_count).await?; + + let expected = vec![ + "+----+----+", + "| c1 | c2 |", + "+----+----+", + "| 3 | 1 |", + "| 3 | 2 |", + "| 3 | 3 |", + "| 3 | 4 |", + "| 3 | 5 |", + "| 3 | 6 |", + "| 3 | 7 |", + "| 3 | 8 |", + "| 3 | 9 |", + "| 3 | 10 |", + "| 2 | 1 |", + "| 2 | 2 |", + "| 2 | 3 |", + "| 2 | 4 |", + "| 2 | 5 |", + "| 2 | 6 |", + "| 2 | 7 |", + "| 2 | 8 |", + "| 2 | 9 |", + "| 2 | 10 |", + "| 1 | 1 |", + "| 1 | 2 |", + "| 1 | 3 |", + "| 1 | 4 |", + "| 1 | 5 |", + "| 1 | 6 |", + "| 1 | 7 |", + "| 1 | 8 |", + "| 1 | 9 |", + "| 1 | 10 |", + "| 0 | 1 |", + "| 0 | 2 |", + "| 0 | 3 |", + "| 0 | 4 |", + "| 0 | 5 |", + "| 0 | 6 |", + "| 0 | 7 |", + "| 0 | 8 |", + "| 0 | 9 |", + "| 0 | 10 |", + "+----+----+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn projection_on_table_scan() -> Result<()> { + let tmp_dir = TempDir::new()?; + let partition_count = 4; + let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?; + let runtime = ctx.state.lock().runtime_env.clone(); + + let table = ctx.table("test")?; + let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan()) + .project(vec![col("c2")])? + .build()?; + + let optimized_plan = ctx.optimize(&logical_plan)?; + match &optimized_plan { + LogicalPlan::Projection(Projection { input, .. }) => match &**input { + LogicalPlan::TableScan(TableScan { + source, + projected_schema, + .. + }) => { + assert_eq!(source.schema().fields().len(), 3); + assert_eq!(projected_schema.fields().len(), 1); + } + _ => panic!("input to projection should be TableScan"), + }, + _ => panic!("expect optimized_plan to be projection"), + } + + let expected = "Projection: #test.c2\ + \n TableScan: test projection=Some([1])"; + assert_eq!(format!("{:?}", optimized_plan), expected); + + let physical_plan = ctx.create_physical_plan(&optimized_plan).await?; + + assert_eq!(1, physical_plan.schema().fields().len()); + assert_eq!("c2", physical_plan.schema().field(0).name().as_str()); + + let batches = collect(physical_plan, runtime).await?; + assert_eq!(40, batches.iter().map(|x| x.num_rows()).sum::()); + + Ok(()) +} + +#[tokio::test] +async fn preserve_nullability_on_projection() -> Result<()> { + let tmp_dir = TempDir::new()?; + let ctx = partitioned_csv::create_ctx(&tmp_dir, 1).await?; + + let schema: Schema = ctx.table("test").unwrap().schema().clone().into(); + assert!(!schema.field_with_name("c1")?.is_nullable()); + + let plan = LogicalPlanBuilder::scan_empty(None, &schema, None)? + .project(vec![col("c1")])? + .build()?; + + let plan = ctx.optimize(&plan)?; + let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?; + assert!(!physical_plan.schema().field_with_name("c1")?.is_nullable()); + Ok(()) +} + +#[tokio::test] +async fn projection_on_memory_scan() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ]); + let schema = SchemaRef::new(schema); + + let partitions = vec![vec![RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from_slice(&[1, 10, 10, 100])), + Arc::new(Int32Array::from_slice(&[2, 12, 12, 120])), + Arc::new(Int32Array::from_slice(&[3, 12, 12, 120])), + ], + )?]]; + + let plan = LogicalPlanBuilder::scan_memory(partitions, schema, None)? + .project(vec![col("b")])? + .build()?; + assert_fields_eq(&plan, vec!["b"]); + + let ctx = ExecutionContext::new(); + let optimized_plan = ctx.optimize(&plan)?; + match &optimized_plan { + LogicalPlan::Projection(Projection { input, .. }) => match &**input { + LogicalPlan::TableScan(TableScan { + source, + projected_schema, + .. + }) => { + assert_eq!(source.schema().fields().len(), 3); + assert_eq!(projected_schema.fields().len(), 1); + } + _ => panic!("input to projection should be InMemoryScan"), + }, + _ => panic!("expect optimized_plan to be projection"), + } + + let expected = format!( + "Projection: #{}.b\ + \n TableScan: {} projection=Some([1])", + UNNAMED_TABLE, UNNAMED_TABLE + ); + assert_eq!(format!("{:?}", optimized_plan), expected); + + let physical_plan = ctx.create_physical_plan(&optimized_plan).await?; + + assert_eq!(1, physical_plan.schema().fields().len()); + assert_eq!("b", physical_plan.schema().field(0).name().as_str()); + + let runtime = ctx.state.lock().runtime_env.clone(); + let batches = collect(physical_plan, runtime).await?; + assert_eq!(1, batches.len()); + assert_eq!(1, batches[0].num_columns()); + assert_eq!(4, batches[0].num_rows()); + + Ok(()) +} + +fn assert_fields_eq(plan: &LogicalPlan, expected: Vec<&str>) { + let actual: Vec = plan + .schema() + .fields() + .iter() + .map(|f| f.name().clone()) + .collect(); + assert_eq!(actual, expected); +} diff --git a/datafusion/tests/sql/select.rs b/datafusion/tests/sql/select.rs index 759a45c9fca9..02869dd99d2b 100644 --- a/datafusion/tests/sql/select.rs +++ b/datafusion/tests/sql/select.rs @@ -16,7 +16,8 @@ // under the License. use super::*; -use datafusion::from_slice::FromSlice; +use datafusion::{from_slice::FromSlice, physical_plan::collect_partitioned}; +use tempfile::TempDir; #[tokio::test] async fn all_where_empty() -> Result<()> { @@ -928,3 +929,59 @@ async fn csv_select_nested() -> Result<()> { assert_batches_eq!(expected, &actual); Ok(()) } + +#[tokio::test] +async fn parallel_query_with_filter() -> Result<()> { + let tmp_dir = TempDir::new()?; + let partition_count = 4; + let ctx = partitioned_csv::create_ctx(&tmp_dir, partition_count).await?; + + let logical_plan = + ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")?; + let logical_plan = ctx.optimize(&logical_plan)?; + + let physical_plan = ctx.create_physical_plan(&logical_plan).await?; + + let runtime = ctx.state.lock().runtime_env.clone(); + let results = collect_partitioned(physical_plan, runtime).await?; + + // note that the order of partitions is not deterministic + let mut num_rows = 0; + for partition in &results { + for batch in partition { + num_rows += batch.num_rows(); + } + } + assert_eq!(20, num_rows); + + let results: Vec = results.into_iter().flatten().collect(); + let expected = vec![ + "+----+----+", + "| c1 | c2 |", + "+----+----+", + "| 1 | 1 |", + "| 1 | 10 |", + "| 1 | 2 |", + "| 1 | 3 |", + "| 1 | 4 |", + "| 1 | 5 |", + "| 1 | 6 |", + "| 1 | 7 |", + "| 1 | 8 |", + "| 1 | 9 |", + "| 2 | 1 |", + "| 2 | 10 |", + "| 2 | 2 |", + "| 2 | 3 |", + "| 2 | 4 |", + "| 2 | 5 |", + "| 2 | 6 |", + "| 2 | 7 |", + "| 2 | 8 |", + "| 2 | 9 |", + "+----+----+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +}