Skip to content

Commit

Permalink
move some tests out of context and into sql (#1846)
Browse files Browse the repository at this point in the history
* move some code out of context and into sql

* extract some more

* extract a bit more
  • Loading branch information
alamb authored Feb 17, 2022
1 parent 584bb75 commit 4c0b17f
Show file tree
Hide file tree
Showing 8 changed files with 361 additions and 347 deletions.
348 changes: 4 additions & 344 deletions datafusion/src/execution/context.rs

Large diffs are not rendered by default.

50 changes: 50 additions & 0 deletions datafusion/tests/sql/create_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use std::io::Write;

use tempfile::TempDir;

use super::*;

#[tokio::test]
Expand Down Expand Up @@ -76,3 +80,49 @@ async fn csv_query_create_external_table() {
];
assert_batches_eq!(expected, &actual);
}

#[tokio::test]
async fn create_external_table_with_timestamps() {
let mut ctx = ExecutionContext::new();

let data = "Jorge,2018-12-13T12:12:10.011Z\n\
Andrew,2018-11-13T17:11:10.011Z";

let tmp_dir = TempDir::new().unwrap();
let file_path = tmp_dir.path().join("timestamps.csv");

// scope to ensure the file is closed and written
{
std::fs::File::create(&file_path)
.expect("creating temp file")
.write_all(data.as_bytes())
.expect("writing data");
}

let sql = format!(
"CREATE EXTERNAL TABLE csv_with_timestamps (
name VARCHAR,
ts TIMESTAMP
)
STORED AS CSV
LOCATION '{}'
",
file_path.to_str().expect("path is utf8")
);

plan_and_collect(&mut ctx, &sql)
.await
.expect("Executing CREATE EXTERNAL TABLE");

let sql = "SELECT * from csv_with_timestamps";
let result = plan_and_collect(&mut ctx, sql).await.unwrap();
let expected = vec![
"+--------+-------------------------+",
"| name | ts |",
"+--------+-------------------------+",
"| Andrew | 2018-11-13 17:11:10.011 |",
"| Jorge | 2018-12-13 12:12:10.011 |",
"+--------+-------------------------+",
];
assert_batches_sorted_eq!(expected, &result);
}
2 changes: 1 addition & 1 deletion datafusion/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ async fn nyc() -> Result<()> {
},
_ => unreachable!(),
},
_ => unreachable!(false),
_ => unreachable!(),
}

Ok(())
Expand Down
74 changes: 74 additions & 0 deletions datafusion/tests/sql/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,77 @@ async fn test_specific_nulls_first_asc() -> Result<()> {
assert_batches_eq!(expected, &actual);
Ok(())
}

#[tokio::test]
async fn sort() -> Result<()> {
let results =
partitioned_csv::execute("SELECT c1, c2 FROM test ORDER BY c1 DESC, c2 ASC", 4)
.await?;
assert_eq!(results.len(), 1);

let expected: Vec<&str> = vec![
"+----+----+",
"| c1 | c2 |",
"+----+----+",
"| 3 | 1 |",
"| 3 | 2 |",
"| 3 | 3 |",
"| 3 | 4 |",
"| 3 | 5 |",
"| 3 | 6 |",
"| 3 | 7 |",
"| 3 | 8 |",
"| 3 | 9 |",
"| 3 | 10 |",
"| 2 | 1 |",
"| 2 | 2 |",
"| 2 | 3 |",
"| 2 | 4 |",
"| 2 | 5 |",
"| 2 | 6 |",
"| 2 | 7 |",
"| 2 | 8 |",
"| 2 | 9 |",
"| 2 | 10 |",
"| 1 | 1 |",
"| 1 | 2 |",
"| 1 | 3 |",
"| 1 | 4 |",
"| 1 | 5 |",
"| 1 | 6 |",
"| 1 | 7 |",
"| 1 | 8 |",
"| 1 | 9 |",
"| 1 | 10 |",
"| 0 | 1 |",
"| 0 | 2 |",
"| 0 | 3 |",
"| 0 | 4 |",
"| 0 | 5 |",
"| 0 | 6 |",
"| 0 | 7 |",
"| 0 | 8 |",
"| 0 | 9 |",
"| 0 | 10 |",
"+----+----+",
];

// Note it is important to NOT use assert_batches_sorted_eq
// here as we are testing the sortedness of the output
assert_batches_eq!(expected, &results);

Ok(())
}

#[tokio::test]
async fn sort_empty() -> Result<()> {
// The predicate on this query purposely generates no results
let results = partitioned_csv::execute(
"SELECT c1, c2 FROM test WHERE c1 > 100000 ORDER BY c1 DESC, c2 ASC",
4,
)
.await
.unwrap();
assert_eq!(results.len(), 0);
Ok(())
}
59 changes: 59 additions & 0 deletions datafusion/tests/sql/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use std::{collections::HashMap, fs, path::Path};

use ::parquet::arrow::ArrowWriter;
use tempfile::TempDir;

use super::*;

#[tokio::test]
Expand Down Expand Up @@ -162,3 +167,57 @@ async fn parquet_list_columns() {
assert_eq!(result.value(2), "hij");
assert_eq!(result.value(3), "xyz");
}

#[tokio::test]
async fn schema_merge_ignores_metadata() {
// Create two parquet files in same table with same schema but different metadata
let tmp_dir = TempDir::new().unwrap();
let table_dir = tmp_dir.path().join("parquet_test");
let table_path = Path::new(&table_dir);

let mut non_empty_metadata: HashMap<String, String> = HashMap::new();
non_empty_metadata.insert("testing".to_string(), "metadata".to_string());

let fields = vec![
Field::new("id", DataType::Int32, true),
Field::new("name", DataType::Utf8, true),
];
let schemas = vec![
Arc::new(Schema::new_with_metadata(
fields.clone(),
non_empty_metadata.clone(),
)),
Arc::new(Schema::new(fields.clone())),
];

if let Ok(()) = fs::create_dir(table_path) {
for (i, schema) in schemas.iter().enumerate().take(2) {
let filename = format!("part-{}.parquet", i);
let path = table_path.join(&filename);
let file = fs::File::create(path).unwrap();
let mut writer =
ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), None)
.unwrap();

// create mock record batch
let ids = Arc::new(Int32Array::from_slice(&[i as i32]));
let names = Arc::new(StringArray::from_slice(&["test"]));
let rec_batch =
RecordBatch::try_new(schema.clone(), vec![ids, names]).unwrap();

writer.write(&rec_batch).unwrap();
writer.close().unwrap();
}
}

// Read the parquet files into a dataframe to confirm results
// (no errors)
let mut ctx = ExecutionContext::new();
let df = ctx
.read_parquet(table_dir.to_str().unwrap().to_string())
.await
.unwrap();
let result = df.collect().await.unwrap();

assert_eq!(result[0].schema().metadata(), result[1].schema().metadata());
}
2 changes: 1 addition & 1 deletion datafusion/tests/sql/partitioned_csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! Utility functions for running with a partitioned csv dataset:
//! Utility functions for creating and running with a partitioned csv dataset.

use std::{io::Write, sync::Arc};

Expand Down
18 changes: 17 additions & 1 deletion datafusion/tests/sql/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
// under the License.

use super::*;
use datafusion::{from_slice::FromSlice, physical_plan::collect_partitioned};
use datafusion::{
datasource::empty::EmptyTable, from_slice::FromSlice,
physical_plan::collect_partitioned,
};
use tempfile::TempDir;

#[tokio::test]
Expand Down Expand Up @@ -985,3 +988,16 @@ async fn parallel_query_with_filter() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn query_empty_table() {
let mut ctx = ExecutionContext::new();
let empty_table = Arc::new(EmptyTable::new(Arc::new(Schema::empty())));
ctx.register_table("test_tbl", empty_table).unwrap();
let sql = "SELECT * FROM test_tbl";
let result = plan_and_collect(&mut ctx, sql)
.await
.expect("Query empty table");
let expected = vec!["++", "++"];
assert_batches_sorted_eq!(expected, &result);
}
Loading

0 comments on commit 4c0b17f

Please sign in to comment.