Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve transform ordering using explicit ordering column #222

Merged
merged 16 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ arrow-array = { git = "https://github.com/jonmmease/arrow-rs.git", rev = "59a528
arrow-select = { git = "https://github.com/jonmmease/arrow-rs.git", rev = "59a5289b3603db08433e5309eed488a0abbf5d0d"}

# DataFusion 16.0 with backports
datafusion = { git = "https://github.com/jonmmease/arrow-datafusion.git", rev = "1d8c71a4703d27564c1a2bd60b164769f33fbe8f"}
datafusion-common = { git = "https://github.com/jonmmease/arrow-datafusion.git", rev = "1d8c71a4703d27564c1a2bd60b164769f33fbe8f"}
datafusion-expr = { git = "https://github.com/jonmmease/arrow-datafusion.git", rev = "1d8c71a4703d27564c1a2bd60b164769f33fbe8f"}
datafusion = { git = "https://github.com/jonmmease/arrow-datafusion.git", rev = "e3f156f4acc51fc45a7aa6a99085b091f539d5fa"}
datafusion-common = { git = "https://github.com/jonmmease/arrow-datafusion.git", rev = "e3f156f4acc51fc45a7aa6a99085b091f539d5fa"}
datafusion-expr = { git = "https://github.com/jonmmease/arrow-datafusion.git", rev = "e3f156f4acc51fc45a7aa6a99085b091f539d5fa"}
5 changes: 3 additions & 2 deletions python/vegafusion/vegafusion/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ def to_arrow_table(data):
except IndexError:
pass

# Convert DataFrame to table
table = pa.Table.from_pandas(data)
# Convert DataFrame to table. Keep index only if named
preserve_index = bool([name for name in getattr(data.index, "names", []) if name])
table = pa.Table.from_pandas(data, preserve_index=preserve_index)

return table

Expand Down
5 changes: 5 additions & 0 deletions vegafusion-core/src/data/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use arrow::datatypes::DataType;

pub mod json_writer;
pub mod scalar;
pub mod table;
pub mod tasks;

pub const ORDER_COL: &str = "_vf_order";
pub const ORDER_COL_DTYPE: DataType = DataType::UInt32;
99 changes: 97 additions & 2 deletions vegafusion-core/src/data/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use super::scalar::ScalarValue;
use crate::arrow::array::ArrayRef;
use crate::data::json_writer::record_batches_to_json_rows;

use arrow::array::StructArray;
use arrow::datatypes::Field;
use crate::data::{ORDER_COL, ORDER_COL_DTYPE};
use arrow::array::{StructArray, UInt32Array};
use arrow::datatypes::{Field, Schema};
use arrow::json::reader::DecoderOptions;
use serde_json::{json, Value};

Expand Down Expand Up @@ -77,6 +78,50 @@ impl VegaFusionTable {
}
}

pub fn with_ordering(self) -> Result<Self> {
// Build new schema with leading ORDER_COL
let mut new_fields = self.schema.fields.clone();
let mut start_idx = 0;
let leading_field = new_fields
.get(0)
.expect("VegaFusionTable must have at least one column");
let has_order_col = if leading_field.name() == ORDER_COL {
// There is already a leading ORDER_COL, remove it and replace below
new_fields.remove(0);
true
} else {
// We need to add a new leading field for the ORDER_COL
false
};
new_fields.insert(0, Field::new(ORDER_COL, ORDER_COL_DTYPE, false));

let new_schema = Arc::new(Schema::new(new_fields)) as SchemaRef;

let new_batches = self
.batches
.into_iter()
.map(|batch| {
let order_array = Arc::new(UInt32Array::from_iter_values(
start_idx..(start_idx + batch.num_rows() as u32),
)) as ArrayRef;

let mut new_columns = Vec::from(batch.columns());

if has_order_col {
new_columns[0] = order_array;
} else {
new_columns.insert(0, order_array);
}

start_idx += batch.num_rows() as u32;

Ok(RecordBatch::try_new(new_schema.clone(), new_columns)?)
})
.collect::<Result<Vec<_>>>()?;

Self::try_new(new_schema, new_batches)
}

pub fn batches(&self) -> &Vec<RecordBatch> {
&self.batches
}
Expand Down Expand Up @@ -235,3 +280,53 @@ impl Hash for VegaFusionTable {
self.to_ipc_bytes().unwrap().hash(state)
}
}

#[cfg(test)]
mod tests {
use crate::data::table::VegaFusionTable;
use serde_json::json;

#[test]
fn test_with_ordering() {
let table1 = VegaFusionTable::from_json(
&json!([
{"a": 1, "b": "A"},
{"a": 2, "b": "BB"},
{"a": 10, "b": "CCC"},
{"a": 20, "b": "DDDD"},
]),
2,
)
.unwrap();
assert_eq!(table1.batches.len(), 2);

let table2 = VegaFusionTable::from_json(
&json!([
{"_vf_order": 10u32, "a": 1, "b": "A"},
{"_vf_order": 9u32, "a": 2, "b": "BB"},
{"_vf_order": 8u32, "a": 10, "b": "CCC"},
{"_vf_order": 7u32, "a": 20, "b": "DDDD"},
]),
2,
)
.unwrap();
assert_eq!(table2.batches.len(), 2);

let expected_json = json!([
{"_vf_order": 0u32, "a": 1, "b": "A"},
{"_vf_order": 1u32, "a": 2, "b": "BB"},
{"_vf_order": 2u32, "a": 10, "b": "CCC"},
{"_vf_order": 3u32, "a": 20, "b": "DDDD"},
]);

// Add ordering column to table without one
let result_table1 = table1.with_ordering().unwrap();
assert_eq!(result_table1.batches.len(), 2);
assert_eq!(result_table1.to_json().unwrap(), expected_json);

// Override prior ordering column
let result_table2 = table2.with_ordering().unwrap();
assert_eq!(result_table2.batches.len(), 2);
assert_eq!(result_table2.to_json().unwrap(), expected_json);
}
}
8 changes: 6 additions & 2 deletions vegafusion-core/src/spec/transform/impute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;

fn default_value() -> Option<Value> {
Some(Value::Number(serde_json::Number::from(0)))
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ImputeTransformSpec {
pub field: Field,
Expand All @@ -22,7 +26,8 @@ pub struct ImputeTransformSpec {
#[serde(skip_serializing_if = "Option::is_none")]
pub groupby: Option<Vec<Field>>,

#[serde(skip_serializing_if = "Option::is_none")]
// Default to zero but serialize even if null
#[serde(default = "default_value")]
pub value: Option<Value>,

#[serde(flatten)]
Expand Down Expand Up @@ -63,7 +68,6 @@ impl TransformSpecTrait for ImputeTransformSpec {
&& self.keyvals.is_none()
&& self.method() == ImputeMethodSpec::Value
&& num_unique_groupby <= 1
&& self.value.is_some()
}

fn transform_columns(
Expand Down
30 changes: 25 additions & 5 deletions vegafusion-rt-datafusion/src/data/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::expression::escape::flat_col;
use crate::sql::connection::datafusion_conn::DataFusionConnection;
use crate::sql::dataframe::SqlDataFrame;
use crate::task_graph::timezone::RuntimeTzConfig;
use crate::transform::pipeline::TransformPipelineUtils;
use crate::transform::pipeline::{remove_order_col, TransformPipelineUtils};
use vegafusion_core::data::scalar::{ScalarValue, ScalarValueHelpers};
use vegafusion_core::data::table::VegaFusionTable;
use vegafusion_core::error::{Result, ResultWithContext, ToExternalError, VegaFusionError};
Expand Down Expand Up @@ -110,8 +110,13 @@ impl TaskCall for DataUrlTask {
let inline_name = inline_name.trim().to_string();
if let Some(inline_dataset) = inline_datasets.get(&inline_name) {
let sql_df = match inline_dataset {
VegaFusionDataset::Table { table, .. } => table.to_sql_dataframe().await?,
VegaFusionDataset::SqlDataFrame(sql_df) => sql_df.clone(),
VegaFusionDataset::Table { table, .. } => {
table.clone().with_ordering()?.to_sql_dataframe().await?
}
VegaFusionDataset::SqlDataFrame(sql_df) => {
// TODO: if no ordering column present, create with a window expression
sql_df.clone()
}
};
let sql_df = process_datetimes(&parse, sql_df, &config.tz_config).await?;
return eval_sql_df(sql_df.clone(), &self.pipeline, &config).await;
Expand Down Expand Up @@ -162,7 +167,8 @@ async fn eval_sql_df(
let pipeline = pipeline.as_ref().unwrap();
pipeline.eval_sql(sql_df, config).await?
} else {
// No transforms
// No transforms, just remove any ordering column
let sql_df = remove_order_col(sql_df).await?;
(sql_df.collect().await?, Vec::new())
};

Expand Down Expand Up @@ -416,6 +422,9 @@ impl TaskCall for DataValuesTask {
return Ok((TaskValue::Table(values_table), Default::default()));
}

// Add ordering column
let values_table = values_table.with_ordering()?;

// Get parse format for date processing
let parse = self.format_type.as_ref().and_then(|fmt| fmt.parse.clone());

Expand Down Expand Up @@ -469,6 +478,9 @@ impl TaskCall for DataSourceTask {
)
});

// Add ordering column
let source_table = source_table.with_ordering()?;

// Apply transforms (if any)
let (transformed_table, output_values) = if self
.pipeline
Expand Down Expand Up @@ -531,12 +543,18 @@ async fn read_csv(url: String, parse: &Option<Parse>) -> Result<DataFrame> {
// Load through VegaFusionTable so that temp file can be deleted
let df = ctx.read_csv(path, csv_opts).await.unwrap();
let table = VegaFusionTable::from_dataframe(df).await.unwrap();
let table = table.with_ordering()?;
let df = table.to_dataframe().await.unwrap();
Ok(df)
} else {
let schema = build_csv_schema(&csv_opts, &url, parse).await?;
let csv_opts = csv_opts.schema(&schema);
Ok(ctx.read_csv(url, csv_opts).await?)

let df = ctx.read_csv(url, csv_opts).await.unwrap();
let table = VegaFusionTable::from_dataframe(df).await.unwrap();
let table = table.with_ordering()?;
let df = table.to_dataframe().await.unwrap();
Ok(df)
}
}

Expand Down Expand Up @@ -622,6 +640,7 @@ async fn read_json(url: &str, batch_size: usize) -> Result<DataFrame> {
};

VegaFusionTable::from_json(&value, batch_size)?
.with_ordering()?
.to_dataframe()
.await
}
Expand Down Expand Up @@ -678,6 +697,7 @@ async fn read_arrow(url: &str) -> Result<DataFrame> {
};

VegaFusionTable::try_new(schema, batches)?
.with_ordering()?
.to_dataframe()
.await
}
Expand Down
Loading