Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move more tests out of context.rs #1743

Merged
merged 1 commit into from
Feb 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 1 addition & 238 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1281,11 +1281,9 @@ mod tests {
use super::*;
use crate::execution::context::QueryPlanner;
use crate::from_slice::FromSlice;
use crate::logical_plan::plan::Projection;
use crate::logical_plan::TableScan;
use crate::logical_plan::{binary_expr, lit, Operator};
use crate::physical_plan::collect;
use crate::physical_plan::functions::{make_scalar_function, Volatility};
use crate::physical_plan::{collect, collect_partitioned};
use crate::test;
use crate::variable::VarType;
use crate::{
Expand All @@ -1311,7 +1309,6 @@ mod tests {
use std::thread::{self, JoinHandle};
use std::{io::prelude::*, sync::Mutex};
use tempfile::TempDir;
use test::*;

#[tokio::test]
async fn shared_memory_and_disk_manager() {
Expand Down Expand Up @@ -1347,62 +1344,6 @@ mod tests {
));
}

#[tokio::test]
async fn parallel_projection() -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test and those below are just moved into sql_integration test

let partition_count = 4;
let results = execute("SELECT c1, c2 FROM test", partition_count).await?;

let expected = vec![
"+----+----+",
"| c1 | c2 |",
"+----+----+",
"| 3 | 1 |",
"| 3 | 2 |",
"| 3 | 3 |",
"| 3 | 4 |",
"| 3 | 5 |",
"| 3 | 6 |",
"| 3 | 7 |",
"| 3 | 8 |",
"| 3 | 9 |",
"| 3 | 10 |",
"| 2 | 1 |",
"| 2 | 2 |",
"| 2 | 3 |",
"| 2 | 4 |",
"| 2 | 5 |",
"| 2 | 6 |",
"| 2 | 7 |",
"| 2 | 8 |",
"| 2 | 9 |",
"| 2 | 10 |",
"| 1 | 1 |",
"| 1 | 2 |",
"| 1 | 3 |",
"| 1 | 4 |",
"| 1 | 5 |",
"| 1 | 6 |",
"| 1 | 7 |",
"| 1 | 8 |",
"| 1 | 9 |",
"| 1 | 10 |",
"| 0 | 1 |",
"| 0 | 2 |",
"| 0 | 3 |",
"| 0 | 4 |",
"| 0 | 5 |",
"| 0 | 6 |",
"| 0 | 7 |",
"| 0 | 8 |",
"| 0 | 9 |",
"| 0 | 10 |",
"+----+----+",
];
assert_batches_sorted_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn create_variable_expr() -> Result<()> {
let tmp_dir = TempDir::new()?;
Expand Down Expand Up @@ -1447,184 +1388,6 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn parallel_query_with_filter() -> Result<()> {
let tmp_dir = TempDir::new()?;
let partition_count = 4;
let ctx = create_ctx(&tmp_dir, partition_count).await?;

let logical_plan =
ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")?;
let logical_plan = ctx.optimize(&logical_plan)?;

let physical_plan = ctx.create_physical_plan(&logical_plan).await?;

let runtime = ctx.state.lock().runtime_env.clone();
let results = collect_partitioned(physical_plan, runtime).await?;

// note that the order of partitions is not deterministic
let mut num_rows = 0;
for partition in &results {
for batch in partition {
num_rows += batch.num_rows();
}
}
assert_eq!(20, num_rows);

let results: Vec<RecordBatch> = results.into_iter().flatten().collect();
let expected = vec![
"+----+----+",
"| c1 | c2 |",
"+----+----+",
"| 1 | 1 |",
"| 1 | 10 |",
"| 1 | 2 |",
"| 1 | 3 |",
"| 1 | 4 |",
"| 1 | 5 |",
"| 1 | 6 |",
"| 1 | 7 |",
"| 1 | 8 |",
"| 1 | 9 |",
"| 2 | 1 |",
"| 2 | 10 |",
"| 2 | 2 |",
"| 2 | 3 |",
"| 2 | 4 |",
"| 2 | 5 |",
"| 2 | 6 |",
"| 2 | 7 |",
"| 2 | 8 |",
"| 2 | 9 |",
"+----+----+",
];
assert_batches_sorted_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn projection_on_table_scan() -> Result<()> {
let tmp_dir = TempDir::new()?;
let partition_count = 4;
let ctx = create_ctx(&tmp_dir, partition_count).await?;
let runtime = ctx.state.lock().runtime_env.clone();

let table = ctx.table("test")?;
let logical_plan = LogicalPlanBuilder::from(table.to_logical_plan())
.project(vec![col("c2")])?
.build()?;

let optimized_plan = ctx.optimize(&logical_plan)?;
match &optimized_plan {
LogicalPlan::Projection(Projection { input, .. }) => match &**input {
LogicalPlan::TableScan(TableScan {
source,
projected_schema,
..
}) => {
assert_eq!(source.schema().fields().len(), 3);
assert_eq!(projected_schema.fields().len(), 1);
}
_ => panic!("input to projection should be TableScan"),
},
_ => panic!("expect optimized_plan to be projection"),
}

let expected = "Projection: #test.c2\
\n TableScan: test projection=Some([1])";
assert_eq!(format!("{:?}", optimized_plan), expected);

let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;

assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("c2", physical_plan.schema().field(0).name().as_str());

let batches = collect(physical_plan, runtime).await?;
assert_eq!(40, batches.iter().map(|x| x.num_rows()).sum::<usize>());

Ok(())
}

#[tokio::test]
async fn preserve_nullability_on_projection() -> Result<()> {
let tmp_dir = TempDir::new()?;
let ctx = create_ctx(&tmp_dir, 1).await?;

let schema: Schema = ctx.table("test").unwrap().schema().clone().into();
assert!(!schema.field_with_name("c1")?.is_nullable());

let plan = LogicalPlanBuilder::scan_empty(None, &schema, None)?
.project(vec![col("c1")])?
.build()?;

let plan = ctx.optimize(&plan)?;
let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?;
assert!(!physical_plan.schema().field_with_name("c1")?.is_nullable());
Ok(())
}

#[tokio::test]
async fn projection_on_memory_scan() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
]);
let schema = SchemaRef::new(schema);

let partitions = vec![vec![RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from_slice(&[1, 10, 10, 100])),
Arc::new(Int32Array::from_slice(&[2, 12, 12, 120])),
Arc::new(Int32Array::from_slice(&[3, 12, 12, 120])),
],
)?]];

let plan = LogicalPlanBuilder::scan_memory(partitions, schema, None)?
.project(vec![col("b")])?
.build()?;
assert_fields_eq(&plan, vec!["b"]);

let ctx = ExecutionContext::new();
let optimized_plan = ctx.optimize(&plan)?;
match &optimized_plan {
LogicalPlan::Projection(Projection { input, .. }) => match &**input {
LogicalPlan::TableScan(TableScan {
source,
projected_schema,
..
}) => {
assert_eq!(source.schema().fields().len(), 3);
assert_eq!(projected_schema.fields().len(), 1);
}
_ => panic!("input to projection should be InMemoryScan"),
},
_ => panic!("expect optimized_plan to be projection"),
}

let expected = format!(
"Projection: #{}.b\
\n TableScan: {} projection=Some([1])",
UNNAMED_TABLE, UNNAMED_TABLE
);
assert_eq!(format!("{:?}", optimized_plan), expected);

let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;

assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("b", physical_plan.schema().field(0).name().as_str());

let runtime = ctx.state.lock().runtime_env.clone();
let batches = collect(physical_plan, runtime).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
assert_eq!(4, batches[0].num_rows());

Ok(())
}

#[tokio::test]
async fn sort() -> Result<()> {
let results =
Expand Down
1 change: 1 addition & 0 deletions datafusion/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub mod window;

mod explain;
pub mod information_schema;
mod partitioned_csv;
#[cfg_attr(not(feature = "unicode_expressions"), ignore)]
pub mod unicode;

Expand Down
95 changes: 95 additions & 0 deletions datafusion/tests/sql/partitioned_csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Utility functions for running with a partitioned csv dataset:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a copy/paste of code in context.rs.

When I am finished removing the unrelated tests from context.rs I will remove the old copy as well


use std::{io::Write, sync::Arc};

use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use datafusion::{
error::Result,
prelude::{CsvReadOptions, ExecutionConfig, ExecutionContext},
};
use tempfile::TempDir;

/// Execute SQL and return results
async fn plan_and_collect(
ctx: &mut ExecutionContext,
sql: &str,
) -> Result<Vec<RecordBatch>> {
ctx.sql(sql).await?.collect().await
}

/// Execute SQL and return results
pub async fn execute(sql: &str, partition_count: usize) -> Result<Vec<RecordBatch>> {
let tmp_dir = TempDir::new()?;
let mut ctx = create_ctx(&tmp_dir, partition_count).await?;
plan_and_collect(&mut ctx, sql).await
}

/// Generate CSV partitions within the supplied directory
fn populate_csv_partitions(
tmp_dir: &TempDir,
partition_count: usize,
file_extension: &str,
) -> Result<SchemaRef> {
// define schema for data source (csv file)
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::UInt32, false),
Field::new("c2", DataType::UInt64, false),
Field::new("c3", DataType::Boolean, false),
]));

// generate a partitioned file
for partition in 0..partition_count {
let filename = format!("partition-{}.{}", partition, file_extension);
let file_path = tmp_dir.path().join(&filename);
let mut file = std::fs::File::create(file_path)?;

// generate some data
for i in 0..=10 {
let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
file.write_all(data.as_bytes())?;
}
}

Ok(schema)
}

/// Generate a partitioned CSV file and register it with an execution context
pub async fn create_ctx(
tmp_dir: &TempDir,
partition_count: usize,
) -> Result<ExecutionContext> {
let mut ctx =
ExecutionContext::with_config(ExecutionConfig::new().with_target_partitions(8));

let schema = populate_csv_partitions(tmp_dir, partition_count, ".csv")?;

// register csv file with the execution context
ctx.register_csv(
"test",
tmp_dir.path().to_str().unwrap(),
CsvReadOptions::new().schema(&schema),
)
.await?;

Ok(ctx)
}
Loading