Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(expr): implement pretty print for Chunk #6597

Merged
merged 5 commits into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions common/expression/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ common-arrow = { path = "../arrow" }

# Crates.io dependencies
chrono-tz = "0.6.1"
comfy-table = "6"
enum-as-inner = "0.4"
itertools = "0.10"

[dev-dependencies]
goldenfile = "1.4"
23 changes: 22 additions & 1 deletion common/expression/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,29 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;

use crate::values::Column;

pub struct Chunk {
pub columns: Vec<Column>,
columns: Vec<Column>,
}

impl Chunk {
pub fn new(columns: Vec<Column>) -> Self {
assert!(columns.iter().map(|col| col.len()).all_equal());
andylokandy marked this conversation as resolved.
Show resolved Hide resolved
Self { columns }
}

pub fn columns(&self) -> &[Column] {
&self.columns
}

pub fn num_rows(&self) -> usize {
self.columns.get(0).map(Column::len).unwrap_or(0)
}

pub fn num_columns(&self) -> usize {
self.columns.len()
}
}
92 changes: 61 additions & 31 deletions common/expression/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ use std::fmt::Debug;
use std::fmt::Display;
use std::fmt::Formatter;

use comfy_table::Cell;
use comfy_table::Table;
use itertools::Itertools;

use crate::chunk::Chunk;
use crate::expression::Expr;
use crate::expression::Literal;
use crate::expression::RawExpr;
Expand All @@ -24,40 +29,65 @@ use crate::property::FunctionProperty;
use crate::property::ValueProperty;
use crate::types::DataType;
use crate::types::ValueType;
use crate::values::ScalarRef;
use crate::values::Value;
use crate::values::ValueRef;

///! Convert a column of record batches into a table
// fn create_table(results: &[DataBlock]) -> Result<Table> {
// let mut table = Table::new();
// table.load_preset("||--+-++| ++++++");

// if results.is_empty() {
// return Ok(table);
// }

// let schema = results[0].schema();

// let mut header = Vec::new();
// for field in schema.fields() {
// header.push(Cell::new(field.name()));
// }
// table.set_header(header);

// for batch in results {
// for row in 0..batch.num_rows() {
// let mut cells = Vec::new();
// for col in 0..batch.num_columns() {
// let column = batch.column(col);
// let str = column.get_checked(row)?.to_string();
// cells.push(Cell::new(&str));
// }
// table.add_row(cells);
// }
// }

// Ok(table)
// }
impl Debug for Chunk {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut table = Table::new();
table.load_preset("||--+-++| ++++++");
table.set_header(vec!["Column ID", "Column Data"]);
for (i, col) in self.columns().iter().enumerate() {
table.add_row(vec![i.to_string(), format!("{:?}", col)]);
}
write!(f, "{}", table)
}
}

impl Display for Chunk {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut table = Table::new();
table.load_preset("||--+-++| ++++++");

table.set_header((0..self.num_columns()).map(|idx| format!("Column {idx}")));

for index in 0..self.num_rows() {
let row: Vec<_> = self
.columns()
.iter()
.map(|col| col.index(index).to_string())
.map(Cell::new)
.collect();
table.add_row(row);
}

write!(f, "{table}")
}
}

impl<'a> Display for ScalarRef<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
ScalarRef::Null => write!(f, "NULL"),
ScalarRef::EmptyArray => write!(f, "[]"),
ScalarRef::Int8(i) => write!(f, "{}", i),
ScalarRef::Int16(i) => write!(f, "{}", i),
ScalarRef::UInt8(i) => write!(f, "{}", i),
ScalarRef::UInt16(i) => write!(f, "{}", i),
ScalarRef::Boolean(b) => write!(f, "{}", b),
ScalarRef::String(s) => write!(f, "{}", String::from_utf8_lossy(s)),
ScalarRef::Array(col) => write!(f, "[{}]", col.iter().join(", ")),
ScalarRef::Tuple(fields) => {
write!(
f,
"({})",
fields.iter().map(ScalarRef::to_string).join(", ")
)
}
}
}
}

impl Display for RawExpr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand Down
11 changes: 10 additions & 1 deletion common/expression/src/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use itertools::Itertools;

use crate::chunk::Chunk;
use crate::expression::Expr;
use crate::expression::Literal;
Expand All @@ -33,7 +35,7 @@ impl Evaluator {
pub fn run(&self, expr: &Expr) -> Value<AnyType> {
match expr {
Expr::Literal(lit) => Value::Scalar(self.run_lit(lit)),
Expr::ColumnRef { id } => Value::Column(self.input_columns.columns[*id].clone()),
Expr::ColumnRef { id } => Value::Column(self.input_columns.columns()[*id].clone()),
Expr::FunctionCall {
function,
args,
Expand All @@ -44,6 +46,13 @@ impl Evaluator {
.iter()
.map(|(expr, _)| self.run(expr))
.collect::<Vec<_>>();
assert!(cols
.iter()
.filter_map(|val| match val {
Value::Column(col) => Some(col.len()),
Value::Scalar(_) => None,
})
.all_equal());
let cols_ref = cols.iter().map(Value::as_ref).collect::<Vec<_>>();
(function.eval)(cols_ref.as_slice(), generics)
}
Expand Down
2 changes: 1 addition & 1 deletion common/expression/src/types/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl<T: ArgType> ArgType for ArrayType<T> {
}

fn build_column((builder, offsets): Self::ColumnBuilder) -> Self::Column {
// TODO: check that they have same length
assert_eq!(T::builder_len(&builder), offsets.len());
(T::build_column(builder), offsets.into())
}

Expand Down
26 changes: 13 additions & 13 deletions common/expression/src/types/nullable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,41 +120,41 @@ impl<T: ArgType> ArgType for NullableType<T> {
validity.len()
}

fn push_item((col, validity): &mut Self::ColumnBuilder, item: Self::ScalarRef<'_>) {
fn push_item((builder, validity): &mut Self::ColumnBuilder, item: Self::ScalarRef<'_>) {
match item {
Some(scalar) => {
T::push_item(col, scalar);
T::push_item(builder, scalar);
validity.push(true);
}
None => {
T::push_default(col);
T::push_default(builder);
validity.push(false);
}
}
}

fn push_default((col, validity): &mut Self::ColumnBuilder) {
T::push_default(col);
fn push_default((builder, validity): &mut Self::ColumnBuilder) {
T::push_default(builder);
validity.push(false);
}

fn append_builder(
(col, validity): &mut Self::ColumnBuilder,
(other_col, other_nulls): &Self::ColumnBuilder,
(builder, validity): &mut Self::ColumnBuilder,
(other_builder, other_nulls): &Self::ColumnBuilder,
) {
T::append_builder(col, other_col);
T::append_builder(builder, other_builder);
validity.extend_from_slice(other_nulls.as_slice(), 0, other_nulls.len());
}

fn build_column((col, validity): Self::ColumnBuilder) -> Self::Column {
// TODO: check that they have same length
(T::build_column(col), validity.into())
fn build_column((builder, validity): Self::ColumnBuilder) -> Self::Column {
assert_eq!(T::builder_len(&builder), validity.len());
(T::build_column(builder), validity.into())
}

fn build_scalar((col, validity): Self::ColumnBuilder) -> Self::Scalar {
fn build_scalar((builder, validity): Self::ColumnBuilder) -> Self::Scalar {
assert_eq!(validity.len(), 1);
if validity.get(0) {
Some(T::build_scalar(col))
Some(T::build_scalar(builder))
} else {
None
}
Expand Down
67 changes: 19 additions & 48 deletions common/expression/src/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,65 +633,36 @@ impl ColumnBuilder {
}

pub fn build_scalar(self) -> Scalar {
assert_eq!(self.len(), 1);
match self {
ColumnBuilder::Null { len } => {
assert_eq!(len, 1);
Scalar::Null
}
ColumnBuilder::EmptyArray { len } => {
assert_eq!(len, 1);
Scalar::EmptyArray
}
ColumnBuilder::Int8(builder) => {
assert_eq!(builder.len(), 1);
Scalar::Int8(builder[0])
}
ColumnBuilder::Int16(builder) => {
assert_eq!(builder.len(), 1);
Scalar::Int16(builder[0])
}
ColumnBuilder::UInt8(builder) => {
assert_eq!(builder.len(), 1);
Scalar::UInt8(builder[0])
}
ColumnBuilder::UInt16(builder) => {
assert_eq!(builder.len(), 1);
Scalar::UInt16(builder[0])
}
ColumnBuilder::Boolean(builder) => {
assert_eq!(builder.len(), 1);
Scalar::Boolean(builder.get(0))
}
ColumnBuilder::Null { .. } => Scalar::Null,
ColumnBuilder::EmptyArray { .. } => Scalar::EmptyArray,
ColumnBuilder::Int8(builder) => Scalar::Int8(builder[0]),
ColumnBuilder::Int16(builder) => Scalar::Int16(builder[0]),
ColumnBuilder::UInt8(builder) => Scalar::UInt8(builder[0]),
ColumnBuilder::UInt16(builder) => Scalar::UInt16(builder[0]),
ColumnBuilder::Boolean(builder) => Scalar::Boolean(builder.get(0)),
ColumnBuilder::String { data, offsets } => {
assert_eq!(offsets.len(), 2);
Scalar::String(data[(offsets[0] as usize)..(offsets[1] as usize)].to_vec())
}
ColumnBuilder::Array { array, offsets } => {
assert_eq!(offsets.len(), 2);
Scalar::Array(
array
.build()
.slice((offsets[0] as usize)..(offsets[1] as usize)),
)
}
ColumnBuilder::Array { array, offsets } => Scalar::Array(
array
.build()
.slice((offsets[0] as usize)..(offsets[1] as usize)),
),
ColumnBuilder::Nullable { column, validity } => {
assert_eq!(column.len(), 1);
assert_eq!(validity.len(), 1);
if validity.get(0) {
column.build_scalar()
} else {
Scalar::Null
}
}
ColumnBuilder::Tuple { fields, len } => {
assert_eq!(len, 1);
Scalar::Tuple(
fields
.into_iter()
.map(|field| field.build_scalar())
.collect(),
)
}
ColumnBuilder::Tuple { fields, .. } => Scalar::Tuple(
fields
.into_iter()
.map(|field| field.build_scalar())
.collect(),
),
}
}
}
Expand Down
27 changes: 19 additions & 8 deletions common/expression/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use common_expression::values::Value;
use common_expression::values::ValueRef;
use goldenfile::Mint;

// TODO: Pretty print the result and input columns
fn run_ast(file: &mut impl Write, raw: &RawExpr, columns: Vec<Column>) {
writeln!(file, "raw expr : {raw}").unwrap();

Expand All @@ -51,15 +50,27 @@ fn run_ast(file: &mut impl Write, raw: &RawExpr, columns: Vec<Column>) {
writeln!(file, "property : {prop}").unwrap();
writeln!(file, "checked expr : {expr}").unwrap();

let chunk = Chunk::new(columns);
if chunk.num_columns() > 0 {
writeln!(file, "input chunk:\n{}", chunk).unwrap();
writeln!(file, "input chunk (internal):\n{:?}", chunk).unwrap();
}

let runtime = Evaluator {
input_columns: Chunk { columns },
input_columns: chunk,
context: FunctionContext::default(),
};
let result = runtime.run(&expr);
for (i, col) in runtime.input_columns.columns.iter().enumerate() {
writeln!(file, "column[{i}] : {col:?}").unwrap();
match result {
Value::Scalar(scalar) => writeln!(file, "evaluation result:\n{}", scalar.as_ref()).unwrap(),
Value::Column(col) => {
let chunk = Chunk::new(vec![col]);
writeln!(file, "evaluation result:\n{}", chunk).unwrap();
writeln!(file, "evaluation result (internal):\n{:?}", chunk).unwrap();
}
}
writeln!(file, "eval result : {result}\n").unwrap();

write!(file, "\n\n").unwrap();
}

#[test]
Expand Down Expand Up @@ -421,13 +432,13 @@ pub fn test() {
array: Box::new(Column::Int16((0..100).collect())),
offsets: vec![
0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90,
100,
95, 100,
]
.into(),
}),
offsets: vec![0, 4, 8, 12, 16, 20].into(),
offsets: vec![0, 4, 8, 11, 15, 20].into(),
},
Column::UInt8(vec![0, 1, 2].into()),
Column::UInt8(vec![0, 1, 2, 3, 4].into()),
],
);
}
Expand Down
Loading