diff --git a/javascript/vegafusion-embed/package-lock.json b/javascript/vegafusion-embed/package-lock.json index d6835d332..2740db204 100644 --- a/javascript/vegafusion-embed/package-lock.json +++ b/javascript/vegafusion-embed/package-lock.json @@ -6,7 +6,7 @@ "packages": { "": { "name": "vegafusion-embed", - "version": "0.3.0", + "version": "0.4.1", "license": "AGPL-3.0-or-later", "dependencies": { "grpc-web": "^1.3.1", @@ -39,7 +39,7 @@ }, "../../vegafusion-wasm/pkg": { "name": "vegafusion-wasm", - "version": "0.1.0", + "version": "0.4.1", "license": "AGPL-3.0-or-later", "dependencies": { "bootstrap": "^5.1.3", @@ -11569,4 +11569,4 @@ "integrity": "sha512-y11nGElTIV+CT3Zv9t7VKl+Q3hTQoT9a1Qzezhhl6Rp21gJ/IVTW7Z3y9EWXhuUBC2Shnf+DX0antecpAwSP8w==" } } -} \ No newline at end of file +} diff --git a/python/vegafusion-jupyter/package-lock.json b/python/vegafusion-jupyter/package-lock.json index 506492c81..05ef464e1 100644 --- a/python/vegafusion-jupyter/package-lock.json +++ b/python/vegafusion-jupyter/package-lock.json @@ -6,7 +6,7 @@ "packages": { "": { "name": "vegafusion-jupyter", - "version": "0.4.0", + "version": "0.4.1", "license": "AGPL-3.0-or-later", "dependencies": { "@jupyter-widgets/base": "^1.1.10 || ^2.0.0 || ^3.0.0 || ^4.0.0", @@ -50,7 +50,7 @@ } }, "../../javascript/vegafusion-embed": { - "version": "0.4.0", + "version": "0.4.1", "license": "AGPL-3.0-or-later", "dependencies": { "grpc-web": "^1.3.1", @@ -9715,7 +9715,7 @@ "resolved": "https://registry.npmjs.org/isomorphic.js/-/isomorphic.js-0.2.5.tgz", "integrity": "sha512-PIeMbHqMt4DnUP3MA/Flc0HElYjMXArsw1qwJZcm9sqR8mq3l8NYizFMty0pWwE/tzIGH3EKK5+jes5mAr85yw==", "funding": { - "type": "GitHub Sponsors \u2764", + "type": "GitHub Sponsors ❤", "url": "https://github.com/sponsors/dmonad" } }, @@ -11553,7 +11553,7 @@ "node": ">=12" }, "funding": { - "type": "GitHub Sponsors \u2764", + "type": "GitHub Sponsors ❤", "url": "https://github.com/sponsors/dmonad" } }, @@ -18457,7 +18457,7 @@ "lib0": "^0.2.42" }, "funding": { - "type": "GitHub Sponsors \u2764", + "type": "GitHub Sponsors ❤", "url": "https://github.com/sponsors/dmonad" }, "peerDependencies": { @@ -18475,7 +18475,7 @@ "lib0": "^0.2.31" }, "funding": { - "type": "GitHub Sponsors \u2764", + "type": "GitHub Sponsors ❤", "url": "https://github.com/sponsors/dmonad" }, "peerDependencies": { @@ -18490,7 +18490,7 @@ "lib0": "^0.2.42" }, "funding": { - "type": "GitHub Sponsors \u2764", + "type": "GitHub Sponsors ❤", "url": "https://github.com/sponsors/dmonad" } }, @@ -18508,7 +18508,7 @@ "y-websocket-server": "bin/server.js" }, "funding": { - "type": "GitHub Sponsors \u2764", + "type": "GitHub Sponsors ❤", "url": "https://github.com/sponsors/dmonad" }, "optionalDependencies": { @@ -18591,7 +18591,7 @@ "lib0": "^0.2.49" }, "funding": { - "type": "GitHub Sponsors \u2764", + "type": "GitHub Sponsors ❤", "url": "https://github.com/sponsors/dmonad" } }, @@ -33061,4 +33061,4 @@ "dev": true } } -} \ No newline at end of file +} diff --git a/vegafusion-core/src/expression/ast/expression.rs b/vegafusion-core/src/expression/ast/expression.rs index 9fd8f675e..549eb1c57 100644 --- a/vegafusion-core/src/expression/ast/expression.rs +++ b/vegafusion-core/src/expression/ast/expression.rs @@ -7,9 +7,13 @@ * this program the details of the active license. */ use crate::error::{Result, VegaFusionError}; +use crate::expression::column_usage::{ + DatasetsColumnUsage, GetDatasetsColumnUsage, VlSelectionFields, +}; use crate::expression::visitors::{ - CheckSupportedExprVisitor, ClearSpansVisitor, ExpressionVisitor, GetInputVariablesVisitor, - ImplicitVariablesExprVisitor, MutExpressionVisitor, UpdateVariablesExprVisitor, + CheckSupportedExprVisitor, ClearSpansVisitor, DatasetsColumnUsageVisitor, ExpressionVisitor, + GetInputVariablesVisitor, ImplicitVariablesExprVisitor, MutExpressionVisitor, + UpdateVariablesExprVisitor, }; use crate::proto::gen::expression::expression::Expr; use crate::proto::gen::expression::{ @@ -18,6 +22,8 @@ use crate::proto::gen::expression::{ UnaryExpression, }; use crate::proto::gen::tasks::Variable; +use crate::task_graph::graph::ScopedVariable; +use crate::task_graph::scope::TaskScope; use crate::task_graph::task::InputVariable; use itertools::sorted; use std::fmt::{Display, Formatter}; @@ -326,3 +332,22 @@ impl> From for Expr { Self::Literal(Literal::new(v, &repr)) } } + +impl GetDatasetsColumnUsage for Expression { + fn datasets_column_usage( + &self, + datum_var: &Option, + usage_scope: &[u32], + task_scope: &TaskScope, + vl_selection_fields: &VlSelectionFields, + ) -> DatasetsColumnUsage { + let mut visitor = DatasetsColumnUsageVisitor::new( + datum_var, + usage_scope, + task_scope, + vl_selection_fields, + ); + self.walk(&mut visitor); + visitor.dataset_column_usage + } +} diff --git a/vegafusion-core/src/expression/column_usage.rs b/vegafusion-core/src/expression/column_usage.rs new file mode 100644 index 000000000..43b0c1e56 --- /dev/null +++ b/vegafusion-core/src/expression/column_usage.rs @@ -0,0 +1,196 @@ +/* + * VegaFusion + * Copyright (C) 2022 VegaFusion Technologies LLC + * + * This program is distributed under multiple licenses. + * Please consult the license documentation provided alongside + * this program the details of the active license. + */ +use crate::task_graph::graph::ScopedVariable; +use crate::task_graph::scope::TaskScope; +use std::collections::{HashMap, HashSet}; + +pub type VlSelectionFields = HashMap>; + +/// Enum storing info on which dataset columns are used in a given context. +/// Due to the dynamic nature of Vega specifications, it's not always possible to statically +/// determine which columns from a dataset will be used at runtime. In this case the +/// ColumnUsage::Unknown variant is used. In the context of projection pushdown, +/// the ColumnUsage::Unknown variant indicates that all of original dataset columns must be +/// maintained +#[derive(Clone, Debug, PartialEq)] +pub enum ColumnUsage { + Unknown, + Known(HashSet), +} + +impl ColumnUsage { + pub fn empty() -> ColumnUsage { + ColumnUsage::Known(Default::default()) + } + + pub fn with_column(&self, column: &str) -> ColumnUsage { + self.union(&ColumnUsage::from(vec![column].as_slice())) + } + + /// Take the union of two ColumnUsage instances. If both are ColumnUsage::Known, then take + /// the union of their known columns. If either is ColumnUsage::Unknown, then the union is + /// also Unknown. + pub fn union(&self, other: &ColumnUsage) -> ColumnUsage { + match (self, other) { + (ColumnUsage::Known(self_cols), ColumnUsage::Known(other_cols)) => { + // If both column usages are known, we can union the known columns + let new_cols: HashSet<_> = self_cols.union(other_cols).cloned().collect(); + ColumnUsage::Known(new_cols) + } + _ => { + // If either is Unknown, then the union is unknown + ColumnUsage::Unknown + } + } + } +} + +impl From<&str> for ColumnUsage { + fn from(column: &str) -> Self { + let columns: HashSet<_> = vec![column.to_string()].into_iter().collect(); + Self::Known(columns) + } +} + +impl From<&[&str]> for ColumnUsage { + fn from(columns: &[&str]) -> Self { + let columns: HashSet<_> = columns.iter().map(|s| s.to_string()).collect(); + Self::Known(columns) + } +} + +impl From<&[String]> for ColumnUsage { + fn from(columns: &[String]) -> Self { + let columns: HashSet<_> = columns.iter().cloned().collect(); + Self::Known(columns) + } +} + +/// Struct that tracks the usage of all columns across a collection of datasets +#[derive(Clone, Debug, PartialEq)] +pub struct DatasetsColumnUsage { + pub usages: HashMap, + pub aliases: HashMap, +} + +impl DatasetsColumnUsage { + pub fn empty() -> Self { + Self { + usages: Default::default(), + aliases: Default::default(), + } + } + + pub fn with_column_usage(&self, datum_var: &ScopedVariable, usage: ColumnUsage) -> Self { + let other_column_usage = Self { + usages: vec![(datum_var.clone(), usage)].into_iter().collect(), + aliases: Default::default(), + }; + self.union(&other_column_usage) + } + + pub fn with_unknown_usage(&self, datum_var: &ScopedVariable) -> Self { + self.with_column_usage(datum_var, ColumnUsage::Unknown) + } + + pub fn with_alias(&self, from: ScopedVariable, to: ScopedVariable) -> Self { + let mut aliases = self.aliases.clone(); + aliases.insert(from, to); + Self { + usages: self.usages.clone(), + aliases, + } + } + + /// Take the union of two DatasetColumnUsage instances. + pub fn union(&self, other: &DatasetsColumnUsage) -> DatasetsColumnUsage { + let self_vars: HashSet<_> = self.usages.keys().cloned().collect(); + let other_vars: HashSet<_> = other.usages.keys().cloned().collect(); + let union_vars: HashSet<_> = self_vars.union(&other_vars).cloned().collect(); + + // Union aliases + let mut aliases = self.aliases.clone(); + for (key, val) in &other.aliases { + aliases.insert(key.clone(), val.clone()); + } + + let mut usages: HashMap = HashMap::new(); + for var in union_vars { + // Check if var is an alias + let var = aliases.get(&var).unwrap_or(&var).clone(); + + let self_usage = self + .usages + .get(&var) + .cloned() + .unwrap_or_else(ColumnUsage::empty); + let other_usage = other + .usages + .get(&var) + .cloned() + .unwrap_or_else(ColumnUsage::empty); + let combined_usage = self_usage.union(&other_usage); + usages.insert(var, combined_usage); + } + + Self { usages, aliases } + } +} + +pub trait GetDatasetsColumnUsage { + fn datasets_column_usage( + &self, + datum_var: &Option, + usage_scope: &[u32], + task_scope: &TaskScope, + vl_selection_fields: &VlSelectionFields, + ) -> DatasetsColumnUsage; +} + +#[cfg(test)] +mod tests { + use crate::expression::column_usage::ColumnUsage; + + #[test] + fn test_with_column() { + let left = ColumnUsage::from(vec!["one", "two"].as_slice()); + let result = left.with_column("three").with_column("four"); + let expected = ColumnUsage::from(vec!["one", "two", "three", "four"].as_slice()); + assert_eq!(result, expected) + } + + #[test] + fn test_union_known_known() { + let left = ColumnUsage::from(vec!["one", "two"].as_slice()); + let right = ColumnUsage::from(vec!["two", "three", "four"].as_slice()); + let union = left.union(&right); + let expected = ColumnUsage::from(vec!["one", "two", "three", "four"].as_slice()); + assert_eq!(union, expected) + } + + #[test] + fn test_union_known_unknown() { + let left = ColumnUsage::from(vec!["one", "two"].as_slice()); + let union = left.union(&ColumnUsage::Unknown); + assert_eq!(union, ColumnUsage::Unknown) + } + + #[test] + fn test_union_unknown_known() { + let right = ColumnUsage::from(vec!["two", "three", "four"].as_slice()); + let union = ColumnUsage::Unknown.union(&right); + assert_eq!(union, ColumnUsage::Unknown) + } + + #[test] + fn test_union_unknown_unknown() { + let union = ColumnUsage::Unknown.union(&ColumnUsage::Unknown); + assert_eq!(union, ColumnUsage::Unknown) + } +} diff --git a/vegafusion-core/src/expression/mod.rs b/vegafusion-core/src/expression/mod.rs index 416650bba..f9d333219 100644 --- a/vegafusion-core/src/expression/mod.rs +++ b/vegafusion-core/src/expression/mod.rs @@ -7,6 +7,7 @@ * this program the details of the active license. */ pub mod ast; +pub mod column_usage; pub mod lexer; pub mod ops; pub mod parser; diff --git a/vegafusion-core/src/expression/visitors.rs b/vegafusion-core/src/expression/visitors.rs index ac990109d..71b470cfb 100644 --- a/vegafusion-core/src/expression/visitors.rs +++ b/vegafusion-core/src/expression/visitors.rs @@ -12,12 +12,16 @@ use crate::proto::gen::expression::{ Identifier, Literal, LogicalExpression, MemberExpression, ObjectExpression, UnaryExpression, }; +use crate::expression::column_usage::{ColumnUsage, DatasetsColumnUsage, VlSelectionFields}; use crate::expression::supported::{ ALL_DATA_FNS, ALL_EXPRESSION_CONSTANTS, ALL_SCALE_FNS, IMPLICIT_VARS, SUPPORTED_DATA_FNS, SUPPORTED_EXPRESSION_FNS, SUPPORTED_SCALE_FNS, }; +use crate::proto::gen::expression::expression::Expr; use crate::proto::gen::expression::literal::Value; use crate::proto::gen::tasks::Variable; +use crate::task_graph::graph::ScopedVariable; +use crate::task_graph::scope::TaskScope; use crate::task_graph::task::InputVariable; use std::collections::HashSet; @@ -173,7 +177,7 @@ impl ExpressionVisitor for UpdateVariablesExprVisitor { } } -/// Visitor to collect all unbound input variables in the expression +/// Visitor to check whether an expression is supported by the VegaFusion Runtime #[derive(Clone, Default)] pub struct CheckSupportedExprVisitor { pub supported: bool, @@ -217,7 +221,7 @@ impl ExpressionVisitor for CheckSupportedExprVisitor { } } -/// Visitor to collect all output variables in the expression +/// Visitor to collect all implicit variables used in an expression #[derive(Clone, Default)] pub struct ImplicitVariablesExprVisitor { pub implicit_vars: HashSet, @@ -239,3 +243,138 @@ impl ExpressionVisitor for ImplicitVariablesExprVisitor { } } } + +/// Visitor to collect the columns +#[derive(Clone)] +pub struct DatasetsColumnUsageVisitor<'a> { + pub vl_selection_fields: &'a VlSelectionFields, + pub datum_var: &'a Option, + pub usage_scope: &'a [u32], + pub task_scope: &'a TaskScope, + pub dataset_column_usage: DatasetsColumnUsage, +} + +impl<'a> DatasetsColumnUsageVisitor<'a> { + pub fn new( + datum_var: &'a Option, + usage_scope: &'a [u32], + task_scope: &'a TaskScope, + vl_selection_fields: &'a VlSelectionFields, + ) -> Self { + Self { + vl_selection_fields, + datum_var, + usage_scope, + task_scope, + dataset_column_usage: DatasetsColumnUsage::empty(), + } + } +} + +impl<'a> ExpressionVisitor for DatasetsColumnUsageVisitor<'a> { + fn visit_member(&mut self, node: &MemberExpression) { + if let (Some(datum_var), Some(object), Some(property)) = + (&self.datum_var, &node.object, &node.property) + { + if let (Some(Expr::Identifier(object_id)), Some(property_expr)) = + (&object.expr, &property.expr) + { + if object_id.name == "datum" { + // This expression is a member expression on the datum free variable + if node.computed { + match property_expr { + Expr::Literal(Literal { + value: Some(Value::String(name)), + .. + }) => { + // Found `datum['col_name']` usage + self.dataset_column_usage = self + .dataset_column_usage + .with_column_usage(datum_var, ColumnUsage::from(name.as_str())); + } + _ => { + // Unknown usage (e.g. `datum['col_' + 'name']`) + self.dataset_column_usage = + self.dataset_column_usage.with_unknown_usage(datum_var); + } + } + } else { + match property_expr { + Expr::Identifier(id) => { + // Found `datum.col_name` usage + self.dataset_column_usage = + self.dataset_column_usage.with_column_usage( + datum_var, + ColumnUsage::from(id.name.as_str()), + ); + } + _ => { + // Unknown datum usage + self.dataset_column_usage = + self.dataset_column_usage.with_unknown_usage(datum_var); + } + } + } + } + } + } + } + + fn visit_call(&mut self, node: &CallExpression) { + // Handle data functions + if ALL_DATA_FNS.contains(node.callee.as_str()) { + // First argument should be a string + if let Some(Expression { + expr: + Some(Expr::Literal(Literal { + value: Some(Value::String(reference_data_name)), + .. + })), + .. + }) = node.arguments.get(0) + { + // Resolve data variable + let reference_data_var = Variable::new_data(reference_data_name); + if let Ok(resolved) = self + .task_scope + .resolve_scope(&reference_data_var, self.usage_scope) + { + let scoped_reference_data_var: ScopedVariable = (resolved.var, resolved.scope); + // e.g. data('other_dataset') + // We don't know which columns in the referenced dataset are used + self.dataset_column_usage = self + .dataset_column_usage + .with_unknown_usage(&scoped_reference_data_var); + + // Handle vlSelectionTest, which also uses datum columns + if node.callee == "vlSelectionTest" { + if let Some(datum_var) = self.datum_var { + if let Some(fields) = + self.vl_selection_fields.get(&scoped_reference_data_var) + { + // Add selection fields to usage for datum + self.dataset_column_usage = + self.dataset_column_usage.with_column_usage( + datum_var, + ColumnUsage::from(fields.as_slice()), + ); + } else { + // Unknown fields dataset, so we don't know which datum columns + // are needed at runtime + self.dataset_column_usage = + self.dataset_column_usage.with_unknown_usage(datum_var); + } + } + } + } else { + // Unknown brushing dataset, so we don't know which datum columns + // are needed at runtime + if let Some(datum_var) = self.datum_var { + self.dataset_column_usage = + self.dataset_column_usage.with_unknown_usage(datum_var); + } + } + } + } + } +} diff --git a/vegafusion-core/src/planning/extract.rs b/vegafusion-core/src/planning/extract.rs index 9c5548e5c..e775b5eee 100644 --- a/vegafusion-core/src/planning/extract.rs +++ b/vegafusion-core/src/planning/extract.rs @@ -198,11 +198,13 @@ impl<'a> MutChartVisitor for ExtractServerDependenciesVisitor<'a> { type_: "group".to_string(), name: None, from: None, + sort: None, encode: None, data: vec![], signals: vec![], marks: vec![], scales: vec![], + transform: vec![], extra: Default::default(), }; if parent_scope.is_empty() { diff --git a/vegafusion-core/src/planning/mod.rs b/vegafusion-core/src/planning/mod.rs index 229227ecc..1d9c781bb 100644 --- a/vegafusion-core/src/planning/mod.rs +++ b/vegafusion-core/src/planning/mod.rs @@ -10,6 +10,7 @@ pub mod dependency_graph; pub mod extract; pub mod optimize_server; pub mod plan; +pub mod projection_pushdown; pub mod split_domain_data; pub mod stitch; pub mod stringify_local_datetimes; diff --git a/vegafusion-core/src/planning/plan.rs b/vegafusion-core/src/planning/plan.rs index 7d04321bc..06b19cf99 100644 --- a/vegafusion-core/src/planning/plan.rs +++ b/vegafusion-core/src/planning/plan.rs @@ -9,6 +9,7 @@ use crate::error::Result; use crate::planning::extract::extract_server_data; use crate::planning::optimize_server::split_data_url_nodes; +use crate::planning::projection_pushdown::projection_pushdown; use crate::planning::split_domain_data::split_domain_data; use crate::planning::stitch::{stitch_specs, CommPlan}; use crate::planning::stringify_local_datetimes::stringify_local_datetimes; @@ -19,6 +20,7 @@ pub struct PlannerConfig { pub split_domain_data: bool, pub split_url_data_nodes: bool, pub stringify_local_datetimes: bool, + pub projection_pushdown: bool, } impl Default for PlannerConfig { @@ -27,6 +29,7 @@ impl Default for PlannerConfig { split_domain_data: true, split_url_data_nodes: true, stringify_local_datetimes: false, + projection_pushdown: true, } } } @@ -40,6 +43,14 @@ pub struct SpecPlan { impl SpecPlan { pub fn try_new(full_spec: &ChartSpec, config: &PlannerConfig) -> Result { let mut client_spec = full_spec.clone(); + + // Attempt to limit the columns produced by each dataset to only include those + // that are actually used downstream + if config.projection_pushdown { + projection_pushdown(&mut client_spec)?; + } + + // Split datasets that contain a mix of supported and unsupported transforms if config.split_domain_data { split_domain_data(&mut client_spec)?; } diff --git a/vegafusion-core/src/planning/projection_pushdown.rs b/vegafusion-core/src/planning/projection_pushdown.rs new file mode 100644 index 000000000..82b94ccc7 --- /dev/null +++ b/vegafusion-core/src/planning/projection_pushdown.rs @@ -0,0 +1,775 @@ +use crate::error::Result; +use crate::expression::column_usage::{ + ColumnUsage, DatasetsColumnUsage, GetDatasetsColumnUsage, VlSelectionFields, +}; +use crate::expression::parser::parse; +use crate::proto::gen::tasks::Variable; +use crate::spec::chart::{ChartSpec, MutChartVisitor}; +use crate::spec::data::DataSpec; +use crate::spec::mark::{MarkEncodeSpec, MarkEncodingField, MarkEncodingSpec, MarkSpec}; +use crate::spec::scale::{ScaleDataReferenceSpec, ScaleDomainSpec, ScaleRangeSpec, ScaleSpec}; +use crate::spec::signal::{SignalOnEventSpec, SignalSpec}; +use crate::spec::transform::project::ProjectTransformSpec; +use crate::spec::transform::TransformSpec; +use crate::task_graph::graph::ScopedVariable; +use crate::task_graph::scope::TaskScope; +use itertools::sorted; + +/// This planning phase attempts to identify the precise subset of columns that are required +/// of each dataset. If this can be determined for a particular dataset, then a projection +/// transform is appended to the dataset's transform array. If it cannot be determined, then +/// no change is made. +pub fn projection_pushdown(chart_spec: &mut ChartSpec) -> Result<()> { + let datum_var = None; + let usage_scope = Vec::new(); + let task_scope = chart_spec.to_task_scope()?; + + // Note: In the future, we may attempt to identify the fields that are required in the + // presence of a call to vlSelectionTest. Since we don't do this yet, vl_selection_fields is + // empty (meaning we don't know which fields are used by vlSelectionTest). + let vl_selection_fields = Default::default(); + + let datasets_column_usage = chart_spec.datasets_column_usage( + &datum_var, + usage_scope.as_slice(), + &task_scope, + &vl_selection_fields, + ); + + let mut visitor = InsertProjectionVisitor::new(&datasets_column_usage); + chart_spec.walk_mut(&mut visitor)?; + Ok(()) +} + +impl GetDatasetsColumnUsage for MarkEncodingField { + fn datasets_column_usage( + &self, + datum_var: &Option, + _usage_scope: &[u32], + _task_scope: &TaskScope, + _vl_selection_fields: &VlSelectionFields, + ) -> DatasetsColumnUsage { + if let Some(datum_var) = datum_var { + let column_usage = match self { + MarkEncodingField::Field(field) => { + if field.contains('.') || field.contains('[') { + // Specification of a nested column like "target['x']" or "source.x" + // (https://vega.github.io/vega/docs/types/#Field) + // Eventually we could add a separate parser to identify the column portion, + // but for now just declare as unknown column usage + ColumnUsage::Unknown + } else { + ColumnUsage::empty().with_column(field) + } + } + MarkEncodingField::Object(_) => { + // Field is an object that should have a "field" property. + // Eventually we can add support for this form, for now declare as unknown + // column usage + ColumnUsage::Unknown + } + }; + DatasetsColumnUsage::empty().with_column_usage(datum_var, column_usage) + } else { + DatasetsColumnUsage::empty() + } + } +} + +impl GetDatasetsColumnUsage for MarkEncodingSpec { + fn datasets_column_usage( + &self, + datum_var: &Option, + usage_scope: &[u32], + task_scope: &TaskScope, + vl_selection_fields: &VlSelectionFields, + ) -> DatasetsColumnUsage { + let mut usage = DatasetsColumnUsage::empty(); + + if let Some(datum_var) = datum_var { + // Handle direct field references + if let Some(field) = &self.field { + usage = usage.union(&field.datasets_column_usage( + &Some(datum_var.clone()), + usage_scope, + task_scope, + vl_selection_fields, + )) + } + + // Handle signal + if let Some(signal) = &self.signal { + match parse(signal) { + Ok(parsed) => { + usage = usage.union(&parsed.datasets_column_usage( + &Some(datum_var.clone()), + usage_scope, + task_scope, + vl_selection_fields, + )) + } + Err(_) => { + // Failed to parse expression, unknown column usage + usage = usage.with_unknown_usage(datum_var); + } + } + } + + // Handle test expression + if let Some(signal) = &self.test { + match parse(signal) { + Ok(parsed) => { + usage = usage.union(&parsed.datasets_column_usage( + &Some(datum_var.clone()), + usage_scope, + task_scope, + vl_selection_fields, + )) + } + Err(_) => { + // Failed to parse expression, unknown column usage + usage = usage.with_unknown_usage(datum_var); + } + } + } + } + usage + } +} + +impl GetDatasetsColumnUsage for MarkEncodeSpec { + fn datasets_column_usage( + &self, + datum_var: &Option, + usage_scope: &[u32], + task_scope: &TaskScope, + vl_selection_fields: &VlSelectionFields, + ) -> DatasetsColumnUsage { + // Initialize empty usage + let mut usage = DatasetsColumnUsage::empty(); + + // Iterate over all encoding channels + for encoding_spec in self.encodings.values() { + for encoding_or_list in encoding_spec.channels.values() { + for encoding in encoding_or_list.to_vec() { + usage = usage.union(&encoding.datasets_column_usage( + datum_var, + usage_scope, + task_scope, + vl_selection_fields, + )) + } + } + } + + usage + } +} + +impl GetDatasetsColumnUsage for MarkSpec { + fn datasets_column_usage( + &self, + _datum_var: &Option, + usage_scope: &[u32], + task_scope: &TaskScope, + vl_selection_fields: &VlSelectionFields, + ) -> DatasetsColumnUsage { + // Initialize empty usage + let mut usage = DatasetsColumnUsage::empty(); + if self.type_ == "group" { + // group marks with data, signals, scales, marks + for sig in &self.signals { + usage = usage.union(&sig.datasets_column_usage( + &None, + usage_scope, + task_scope, + vl_selection_fields, + )) + } + + for scale in &self.scales { + usage = usage.union(&scale.datasets_column_usage( + &None, + usage_scope, + task_scope, + vl_selection_fields, + )) + } + + for data in &self.data { + usage = usage.union(&data.datasets_column_usage( + &None, + usage_scope, + task_scope, + vl_selection_fields, + )) + } + + // Handle group from->facet->name. In this case, a new dataset is named for the + // subsets of the input dataset. For now, this means we don't know what columns + // from the input dataset are used. In the future, we could track which columns of + // the subset datasets are used. + if let Some(facet) = self.from.as_ref().and_then(|from| from.facet.clone()) { + let facet_data_var = Variable::new_data(&facet.data); + if let Ok(resolved) = task_scope.resolve_scope(&facet_data_var, usage_scope) { + let scoped_facet_data_var = (resolved.var, resolved.scope); + usage = usage.with_unknown_usage(&scoped_facet_data_var); + } + } + + // Handle group mark with from->data. For now, this results in unknown usage because + // the data columns can be used by outside of the encoding channels + // (e.g. in the title object) with the parent variable + if let Some(data) = self.from.as_ref().and_then(|from| from.data.clone()) { + let from_data_var = Variable::new_data(&data); + if let Ok(resolved) = task_scope.resolve_scope(&from_data_var, usage_scope) { + let scoped_from_data_var = (resolved.var, resolved.scope); + usage = usage.with_unknown_usage(&scoped_from_data_var); + } + } + + let mut child_group_idx = 0; + for mark in &self.marks { + if mark.type_ == "group" { + let mut child_usage_scope = Vec::from(usage_scope); + child_usage_scope.push(child_group_idx as u32); + usage = usage.union(&mark.datasets_column_usage( + &None, + child_usage_scope.as_slice(), + task_scope, + vl_selection_fields, + )); + child_group_idx += 1; + } else { + usage = usage.union(&mark.datasets_column_usage( + &None, + usage_scope, + task_scope, + vl_selection_fields, + )) + } + } + } else { + // non-group marks + if let Some(from) = &self.from { + if let Some(data_name) = &from.data { + let data_var = Variable::new_data(data_name); + if let Ok(resolved) = task_scope.resolve_scope(&data_var, usage_scope) { + let scoped_datum_var: ScopedVariable = (resolved.var, resolved.scope); + if let Some(encode) = &self.encode { + usage = usage.union(&encode.datasets_column_usage( + &Some(scoped_datum_var.clone()), + usage_scope, + task_scope, + vl_selection_fields, + )) + } + + // Handle sort expression + if let Some(sort) = &self.sort { + let sort_fields = sort.field.to_vec(); + for sort_field in sort_fields { + if let Ok(parsed) = parse(&sort_field) { + usage = usage.union(&parsed.datasets_column_usage( + &Some(scoped_datum_var.clone()), + usage_scope, + task_scope, + vl_selection_fields, + )); + } + } + } + + // Check for mark-level transforms. We don't look inside of these yet, + // so we don't know which columns are used + if !self.transform.is_empty() { + usage = usage.with_unknown_usage(&scoped_datum_var); + } + } + } + } + } + + // All marks with "from" data source + + usage + } +} + +impl GetDatasetsColumnUsage for ScaleDataReferenceSpec { + fn datasets_column_usage( + &self, + _datum_var: &Option, + usage_scope: &[u32], + task_scope: &TaskScope, + _vl_selection_fields: &VlSelectionFields, + ) -> DatasetsColumnUsage { + let mut usage = DatasetsColumnUsage::empty(); + let data_var = Variable::new_data(&self.data); + if let Ok(resolved) = task_scope.resolve_scope(&data_var, usage_scope) { + let scoped_datum_var: ScopedVariable = (resolved.var, resolved.scope); + usage = + usage.with_column_usage(&scoped_datum_var, ColumnUsage::from(self.field.as_str())) + } + + usage + } +} + +impl GetDatasetsColumnUsage for ScaleDomainSpec { + fn datasets_column_usage( + &self, + _datum_var: &Option, + usage_scope: &[u32], + task_scope: &TaskScope, + vl_selection_fields: &VlSelectionFields, + ) -> DatasetsColumnUsage { + let mut usage = DatasetsColumnUsage::empty(); + let scale_data_refs = match &self { + ScaleDomainSpec::FieldReference(field) => { + vec![field.clone()] + } + ScaleDomainSpec::FieldsReference(fields) => fields.fields.clone(), + _ => Vec::new(), + }; + for scale_data_ref in scale_data_refs { + usage = usage.union(&scale_data_ref.datasets_column_usage( + &None, + usage_scope, + task_scope, + vl_selection_fields, + )) + } + usage + } +} + +impl GetDatasetsColumnUsage for ScaleRangeSpec { + fn datasets_column_usage( + &self, + _datum_var: &Option, + usage_scope: &[u32], + task_scope: &TaskScope, + vl_selection_fields: &VlSelectionFields, + ) -> DatasetsColumnUsage { + let mut usage = DatasetsColumnUsage::empty(); + if let ScaleRangeSpec::Reference(data_ref) = &self { + usage = usage.union(&data_ref.datasets_column_usage( + &None, + usage_scope, + task_scope, + vl_selection_fields, + )) + } + usage + } +} + +impl GetDatasetsColumnUsage for ScaleSpec { + fn datasets_column_usage( + &self, + _datum_var: &Option, + usage_scope: &[u32], + task_scope: &TaskScope, + vl_selection_fields: &VlSelectionFields, + ) -> DatasetsColumnUsage { + let mut usage = DatasetsColumnUsage::empty(); + if let Some(domain) = &self.domain { + usage = usage.union(&domain.datasets_column_usage( + &None, + usage_scope, + task_scope, + vl_selection_fields, + )) + } + + if let Some(range) = &self.range { + usage = usage.union(&range.datasets_column_usage( + &None, + usage_scope, + task_scope, + vl_selection_fields, + )) + } + usage + } +} + +impl GetDatasetsColumnUsage for SignalSpec { + fn datasets_column_usage( + &self, + _datum_var: &Option, + usage_scope: &[u32], + task_scope: &TaskScope, + vl_selection_fields: &VlSelectionFields, + ) -> DatasetsColumnUsage { + let mut usage = DatasetsColumnUsage::empty(); + let mut expr_strs = Vec::new(); + + // Collect all expression strings used in the signal definition + // init + if let Some(init) = &self.init { + expr_strs.push(init.clone()) + } + + // update + if let Some(update) = &self.update { + expr_strs.push(update.clone()) + } + + // on + for sig_on in &self.on { + expr_strs.push(sig_on.update.clone()); + for sig_event in sig_on.events.to_vec() { + if let SignalOnEventSpec::Signal(signal) = sig_event { + expr_strs.push(signal.signal.clone()); + } + } + } + + for expr_str in expr_strs { + if let Ok(parsed) = parse(&expr_str) { + usage = usage.union(&parsed.datasets_column_usage( + &None, + usage_scope, + task_scope, + vl_selection_fields, + )) + } + } + + usage + } +} + +impl GetDatasetsColumnUsage for DataSpec { + fn datasets_column_usage( + &self, + _datum_var: &Option, + usage_scope: &[u32], + task_scope: &TaskScope, + _vl_selection_fields: &VlSelectionFields, + ) -> DatasetsColumnUsage { + let mut usage = DatasetsColumnUsage::empty(); + if let Some(source) = &self.source { + // For right now, assume that all columns in source dataset are required by this + // dataset. Eventually we'll want to examine the individual transforms in this dataset + // to determine the precise subset of columns that are required. + let source_var = Variable::new_data(source); + if let Ok(resolved) = task_scope.resolve_scope(&source_var, usage_scope) { + let data_var = (resolved.var, resolved.scope); + usage = usage.with_unknown_usage(&data_var); + } + } + + // Check for lookup transform and ensure that all columns are kept from the looked up + // dataset + for tx in &self.transform { + if let TransformSpec::Lookup(lookup) = tx { + let lookup_from_var = Variable::new_data(&lookup.from); + if let Ok(resolved) = task_scope.resolve_scope(&lookup_from_var, usage_scope) { + let lookup_data_var = (resolved.var, resolved.scope); + usage = usage.with_unknown_usage(&lookup_data_var); + } + } + } + usage + } +} + +impl GetDatasetsColumnUsage for ChartSpec { + fn datasets_column_usage( + &self, + _datum_var: &Option, + _usage_scope: &[u32], + task_scope: &TaskScope, + vl_selection_fields: &VlSelectionFields, + ) -> DatasetsColumnUsage { + // Initialize empty usage + let mut usage = DatasetsColumnUsage::empty(); + + // group marks with data, signals, scales, marks + for sig in &self.signals { + usage = + usage.union(&sig.datasets_column_usage(&None, &[], task_scope, vl_selection_fields)) + } + + for scale in &self.scales { + usage = usage.union(&scale.datasets_column_usage( + &None, + &[], + task_scope, + vl_selection_fields, + )) + } + + for data in &self.data { + usage = usage.union(&data.datasets_column_usage( + &None, + &[], + task_scope, + vl_selection_fields, + )) + } + + let mut child_group_idx = 0; + for mark in &self.marks { + if mark.type_ == "group" { + let child_usage_scope = vec![child_group_idx as u32]; + usage = usage.union(&mark.datasets_column_usage( + &None, + child_usage_scope.as_slice(), + task_scope, + vl_selection_fields, + )); + child_group_idx += 1; + } else { + usage = usage.union(&mark.datasets_column_usage( + &None, + &[], + task_scope, + vl_selection_fields, + )) + } + } + + usage + } +} + +/// Visitor to collect the non-UTC time scales +struct InsertProjectionVisitor<'a> { + pub columns_usage: &'a DatasetsColumnUsage, +} + +impl<'a> InsertProjectionVisitor<'a> { + pub fn new(columns_usage: &'a DatasetsColumnUsage) -> Self { + Self { columns_usage } + } +} + +impl<'a> MutChartVisitor for InsertProjectionVisitor<'a> { + fn visit_data(&mut self, data: &mut DataSpec, scope: &[u32]) -> Result<()> { + let data_var = Variable::new_data(&data.name); + let scoped_data_var = (data_var, Vec::from(scope)); + if let Some(ColumnUsage::Known(columns)) = self.columns_usage.usages.get(&scoped_data_var) { + if !columns.is_empty() { + // We know exactly which columns are required of this dataset (and it's not none), + // so we can append a projection transform to limit the columns that are produced + let proj_fields: Vec<_> = sorted(columns).cloned().collect(); + let proj_transform = TransformSpec::Project(ProjectTransformSpec { + fields: proj_fields, + extra: Default::default(), + }); + let transforms = &mut data.transform; + transforms.push(proj_transform); + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use crate::expression::column_usage::{ + ColumnUsage, DatasetsColumnUsage, GetDatasetsColumnUsage, VlSelectionFields, + }; + use crate::proto::gen::tasks::Variable; + use crate::spec::data::DataSpec; + use crate::spec::mark::{MarkEncodeSpec, MarkSpec}; + use crate::spec::scale::ScaleSpec; + use crate::spec::signal::SignalSpec; + use crate::task_graph::graph::ScopedVariable; + use crate::task_graph::scope::TaskScope; + use serde_json::json; + + fn selection_fields() -> VlSelectionFields { + vec![( + (Variable::new_data("brush2_store"), Vec::new()), + vec!["AA".to_string(), "BB".to_string(), "CC".to_string()], + )] + .into_iter() + .collect() + } + + fn task_scope() -> TaskScope { + let mut task_scope = TaskScope::new(); + task_scope + .add_variable(&Variable::new_data("brush2_store"), &[]) + .unwrap(); + task_scope + .add_variable(&Variable::new_data("dataA"), &[]) + .unwrap(); + task_scope + } + + #[test] + fn test_mark_encoding_column_known_usage() { + // Define selection dataset fields + let selection_fields = selection_fields(); + + let encodings: MarkEncodeSpec = serde_json::from_value(json!({ + "update": { + "x": {"field": "one", "scale": "scale_a"}, + "y": [ + {"field": "three", "scale": "scale_a", "test": "datum.two > 7"}, + {"value": 23}, + ], + "opacity": [ + {"signal": "datum['four'] * 2", "test": "vlSelectionTest('brush2_store', datum)"}, + {"value": 0.3}, + ] + } + })).unwrap(); + + // Build dataset_column_usage args + let datum_var: ScopedVariable = (Variable::new_data("dataA"), Vec::new()); + let usage_scope = Vec::new(); + let task_scope = task_scope(); + + let usage = encodings.datasets_column_usage( + &Some(datum_var.clone()), + &usage_scope, + &task_scope, + &selection_fields, + ); + + let expected = DatasetsColumnUsage::empty() + .with_column_usage( + &datum_var, + ColumnUsage::from(vec!["AA", "BB", "CC", "one", "two", "three", "four"].as_slice()), + ) + .with_unknown_usage(&(Variable::new_data("brush2_store"), Vec::new())); + + assert_eq!(usage, expected); + + // // Without selection fields column usage should be unknown + let usage = encodings.datasets_column_usage( + &Some(datum_var.clone()), + &usage_scope, + &task_scope, + &Default::default(), + ); + let expected = DatasetsColumnUsage::empty() + .with_unknown_usage(&datum_var) + .with_unknown_usage(&(Variable::new_data("brush2_store"), Vec::new())); + + assert_eq!(usage, expected); + } + + #[test] + fn test_mark_with_known_usage() { + // Define selection dataset fields + let selection_fields = selection_fields(); + + let mark: MarkSpec = serde_json::from_value(json!({ + "type": "rect", + "from": {"data": "dataA"}, + "encode": { + "init": { + "x": {"field": "one", "scale": "scale_a"}, + "y": [ + {"field": "three", "scale": "scale_a", "test": "datum.two > 7"}, + {"value": 23}, + ], + }, + "update": { + "opacity": [ + {"signal": "datum['four'] * 2", "test": "vlSelectionTest('brush2_store', datum)"}, + {"value": 0.3}, + ] + } + } + })).unwrap(); + + // Build dataset_column_usage args + let usage_scope = Vec::new(); + let task_scope = task_scope(); + + let usage = mark.datasets_column_usage(&None, &usage_scope, &task_scope, &selection_fields); + + let expected = DatasetsColumnUsage::empty() + .with_column_usage( + &(Variable::new_data("dataA"), Vec::new()), + ColumnUsage::from(vec!["AA", "BB", "CC", "one", "two", "three", "four"].as_slice()), + ) + .with_unknown_usage(&(Variable::new_data("brush2_store"), Vec::new())); + + assert_eq!(usage, expected); + } + + #[test] + fn test_scale_usage() { + let scale: ScaleSpec = serde_json::from_value(json!({ + "name": "color", + "scale": "quantize", + "domain": {"data": "dataA", "field": "colZ"}, + "range": {"scheme": "blues", "count": 7} + })) + .unwrap(); + + // Build dataset_column_usage args + let usage_scope = Vec::new(); + let task_scope = task_scope(); + + let usage = + scale.datasets_column_usage(&None, &usage_scope, &task_scope, &Default::default()); + + let expected = DatasetsColumnUsage::empty().with_column_usage( + &(Variable::new_data("dataA"), Vec::new()), + ColumnUsage::from(vec!["colZ"].as_slice()), + ); + + assert_eq!(usage, expected); + } + + #[test] + fn test_signal_usage() { + let signal: SignalSpec = serde_json::from_value(json!({ + "name": "indexDate", + "description": "A date value that updates in response to mousemove.", + "update": "length(data('brush2_store'))", + "on": [{"events": "mousemove", "update": "length(data('dataA'))"}] + })) + .unwrap(); + + // Build dataset_column_usage args + let usage_scope = Vec::new(); + let task_scope = task_scope(); + + let usage = + signal.datasets_column_usage(&None, &usage_scope, &task_scope, &Default::default()); + + println!("{:#?}", usage); + + let expected = DatasetsColumnUsage::empty() + .with_unknown_usage(&(Variable::new_data("brush2_store"), Vec::new())) + .with_unknown_usage(&(Variable::new_data("dataA"), Vec::new())); + + assert_eq!(usage, expected); + } + + #[test] + fn test_data_usage() { + let dataset: DataSpec = serde_json::from_value(json!({ + "name": "dataB", + "source": "dataA", + "transform": [] + })) + .unwrap(); + + // Build dataset_column_usage args + let usage_scope = Vec::new(); + let task_scope = task_scope(); + + let usage = + dataset.datasets_column_usage(&None, &usage_scope, &task_scope, &Default::default()); + + println!("{:#?}", usage); + + let expected = DatasetsColumnUsage::empty() + .with_unknown_usage(&(Variable::new_data("dataA"), Vec::new())); + + assert_eq!(usage, expected); + } +} diff --git a/vegafusion-core/src/proto/prost_gen/transforms.rs b/vegafusion-core/src/proto/prost_gen/transforms.rs index b512ad3eb..5433c1c16 100644 --- a/vegafusion-core/src/proto/prost_gen/transforms.rs +++ b/vegafusion-core/src/proto/prost_gen/transforms.rs @@ -153,10 +153,17 @@ pub struct WindowFrame { #[prost(int64, optional, tag="2")] pub end: ::core::option::Option, } +// Project + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Project { + #[prost(string, repeated, tag="1")] + pub fields: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} /// Top-level transform #[derive(Clone, PartialEq, ::prost::Message)] pub struct Transform { - #[prost(oneof="transform::TransformKind", tags="1, 2, 3, 4, 5, 6, 7, 8, 9")] + #[prost(oneof="transform::TransformKind", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10")] pub transform_kind: ::core::option::Option, } /// Nested message and enum types in `Transform`. @@ -181,6 +188,8 @@ pub mod transform { Joinaggregate(super::JoinAggregate), #[prost(message, tag="9")] Window(super::Window), + #[prost(message, tag="10")] + Project(super::Project), } } #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/vegafusion-core/src/proto/tonic_gen/transforms.rs b/vegafusion-core/src/proto/tonic_gen/transforms.rs index b512ad3eb..5433c1c16 100644 --- a/vegafusion-core/src/proto/tonic_gen/transforms.rs +++ b/vegafusion-core/src/proto/tonic_gen/transforms.rs @@ -153,10 +153,17 @@ pub struct WindowFrame { #[prost(int64, optional, tag="2")] pub end: ::core::option::Option, } +// Project + +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Project { + #[prost(string, repeated, tag="1")] + pub fields: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} /// Top-level transform #[derive(Clone, PartialEq, ::prost::Message)] pub struct Transform { - #[prost(oneof="transform::TransformKind", tags="1, 2, 3, 4, 5, 6, 7, 8, 9")] + #[prost(oneof="transform::TransformKind", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10")] pub transform_kind: ::core::option::Option, } /// Nested message and enum types in `Transform`. @@ -181,6 +188,8 @@ pub mod transform { Joinaggregate(super::JoinAggregate), #[prost(message, tag="9")] Window(super::Window), + #[prost(message, tag="10")] + Project(super::Project), } } #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/vegafusion-core/src/proto/transforms.proto b/vegafusion-core/src/proto/transforms.proto index 3b0a70c4c..3375789ab 100644 --- a/vegafusion-core/src/proto/transforms.proto +++ b/vegafusion-core/src/proto/transforms.proto @@ -184,6 +184,11 @@ message WindowFrame { optional int64 end = 2; } +// Project + +message Project { + repeated string fields = 1; +} // Top-level transform message Transform { @@ -197,6 +202,7 @@ message Transform { TimeUnit timeunit = 7; JoinAggregate joinaggregate = 8; Window window = 9; + Project project = 10; } } diff --git a/vegafusion-core/src/spec/mark.rs b/vegafusion-core/src/spec/mark.rs index e65b2a1cb..6b77db89c 100644 --- a/vegafusion-core/src/spec/mark.rs +++ b/vegafusion-core/src/spec/mark.rs @@ -11,6 +11,7 @@ use crate::spec::chart::{ChartVisitor, MutChartVisitor}; use crate::spec::data::DataSpec; use crate::spec::scale::ScaleSpec; use crate::spec::signal::SignalSpec; +use crate::spec::values::StringOrStringList; use serde::{Deserialize, Serialize}; use serde_json::{Number, Value}; use std::collections::HashMap; @@ -26,6 +27,9 @@ pub struct MarkSpec { #[serde(skip_serializing_if = "Option::is_none")] pub from: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub sort: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub encode: Option, @@ -41,6 +45,9 @@ pub struct MarkSpec { #[serde(default, skip_serializing_if = "Vec::is_empty")] pub scales: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub transform: Vec, + #[serde(flatten)] pub extra: HashMap, } @@ -214,3 +221,11 @@ pub enum MarkEncodingField { Field(String), Object(Value), } + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct MarkSort { + pub field: StringOrStringList, + + #[serde(flatten)] + pub extra: HashMap, +} diff --git a/vegafusion-core/src/spec/transform/mod.rs b/vegafusion-core/src/spec/transform/mod.rs index 9792fc09a..7a97b0f77 100644 --- a/vegafusion-core/src/spec/transform/mod.rs +++ b/vegafusion-core/src/spec/transform/mod.rs @@ -14,6 +14,7 @@ pub mod filter; pub mod formula; pub mod joinaggregate; pub mod lookup; +pub mod project; pub mod sequence; pub mod timeunit; pub mod unsupported; @@ -28,6 +29,7 @@ use crate::spec::transform::collect::CollectTransformSpec; use crate::spec::transform::formula::FormulaTransformSpec; use crate::spec::transform::joinaggregate::JoinAggregateTransformSpec; use crate::spec::transform::lookup::LookupTransformSpec; +use crate::spec::transform::project::ProjectTransformSpec; use crate::spec::transform::sequence::SequenceTransformSpec; use crate::spec::transform::timeunit::TimeUnitTransformSpec; use crate::spec::transform::unsupported::*; @@ -48,6 +50,7 @@ pub enum TransformSpec { Timeunit(TimeUnitTransformSpec), JoinAggregate(JoinAggregateTransformSpec), Window(WindowTransformSpec), + Project(ProjectTransformSpec), // Unsupported CountPattern(CountpatternTransformSpec), @@ -79,7 +82,6 @@ pub enum TransformSpec { Partition(PartitionTransformSpec), Pie(PieTransformSpec), Pivot(PivotTransformSpec), - Project(ProjectTransformSpec), Quantile(QuantileTransformSpec), Regression(RegressionTransformSpec), ResolveFilter(ResolvefilterTransformSpec), diff --git a/vegafusion-core/src/spec/transform/project.rs b/vegafusion-core/src/spec/transform/project.rs new file mode 100644 index 000000000..6c82210ad --- /dev/null +++ b/vegafusion-core/src/spec/transform/project.rs @@ -0,0 +1,34 @@ +/* + * VegaFusion + * Copyright (C) 2022 VegaFusion Technologies LLC + * + * This program is distributed under multiple licenses. + * Please consult the license documentation provided alongside + * this program the details of the active license. + */ +use crate::spec::transform::TransformSpecTrait; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::collections::HashMap; + +use crate::error::Result; +use crate::task_graph::task::InputVariable; + +/// Struct that serializes to Vega spec for the filter transform +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct ProjectTransformSpec { + pub fields: Vec, + + #[serde(flatten)] + pub extra: HashMap, +} + +impl TransformSpecTrait for ProjectTransformSpec { + fn supported(&self) -> bool { + true + } + + fn input_vars(&self) -> Result> { + Ok(Default::default()) + } +} diff --git a/vegafusion-core/src/spec/transform/unsupported.rs b/vegafusion-core/src/spec/transform/unsupported.rs index 273f503dc..1f8ba61ec 100644 --- a/vegafusion-core/src/spec/transform/unsupported.rs +++ b/vegafusion-core/src/spec/transform/unsupported.rs @@ -57,7 +57,6 @@ unsupported_transforms!( PartitionTransformSpec, PieTransformSpec, PivotTransformSpec, - ProjectTransformSpec, QuantileTransformSpec, RegressionTransformSpec, ResolvefilterTransformSpec, diff --git a/vegafusion-core/src/transform/mod.rs b/vegafusion-core/src/transform/mod.rs index e583c0f16..0c6ad2716 100644 --- a/vegafusion-core/src/transform/mod.rs +++ b/vegafusion-core/src/transform/mod.rs @@ -9,7 +9,9 @@ use crate::error::VegaFusionError; use crate::proto::gen::tasks::Variable; use crate::proto::gen::transforms::transform::TransformKind; -use crate::proto::gen::transforms::{Aggregate, Bin, Collect, Extent, Filter, Formula, TimeUnit}; +use crate::proto::gen::transforms::{ + Aggregate, Bin, Collect, Extent, Filter, Formula, Project, TimeUnit, +}; use crate::proto::gen::transforms::{JoinAggregate, Transform, Window}; use crate::spec::transform::TransformSpec; use crate::task_graph::task::InputVariable; @@ -23,6 +25,7 @@ pub mod filter; pub mod formula; pub mod joinaggregate; pub mod pipeline; +pub mod project; pub mod timeunit; pub mod window; @@ -42,6 +45,7 @@ impl TryFrom<&TransformSpec> for TransformKind { Self::Joinaggregate(JoinAggregate::new(tx_spec)) } TransformSpec::Window(tx_spec) => Self::Window(Window::try_new(tx_spec)?), + TransformSpec::Project(tx_spec) => Self::Project(Project::try_new(tx_spec)?), _ => { return Err(VegaFusionError::parse(&format!( "Unsupported transform: {:?}", @@ -74,6 +78,7 @@ impl TransformKind { TransformKind::Timeunit(tx) => tx, TransformKind::Joinaggregate(tx) => tx, TransformKind::Window(tx) => tx, + TransformKind::Project(tx) => tx, } } } diff --git a/vegafusion-core/src/transform/project.rs b/vegafusion-core/src/transform/project.rs new file mode 100644 index 000000000..24d01eb05 --- /dev/null +++ b/vegafusion-core/src/transform/project.rs @@ -0,0 +1,28 @@ +/* + * VegaFusion + * Copyright (C) 2022 VegaFusion Technologies LLC + * + * This program is distributed under multiple licenses. + * Please consult the license documentation provided alongside + * this program the details of the active license. + */ +use crate::error::Result; +use crate::proto::gen::transforms::Project; +use crate::spec::transform::project::ProjectTransformSpec; +use crate::transform::TransformDependencies; + +use crate::task_graph::task::InputVariable; + +impl Project { + pub fn try_new(spec: &ProjectTransformSpec) -> Result { + Ok(Self { + fields: spec.fields.clone(), + }) + } +} + +impl TransformDependencies for Project { + fn input_vars(&self) -> Vec { + Default::default() + } +} diff --git a/vegafusion-rt-datafusion/src/transform/mod.rs b/vegafusion-rt-datafusion/src/transform/mod.rs index e0ec3d080..4c04f63eb 100644 --- a/vegafusion-rt-datafusion/src/transform/mod.rs +++ b/vegafusion-rt-datafusion/src/transform/mod.rs @@ -14,6 +14,7 @@ pub mod filter; pub mod formula; pub mod joinaggregate; pub mod pipeline; +pub mod project; pub mod timeunit; pub mod utils; pub mod window; @@ -50,6 +51,7 @@ pub fn to_transform_trait(tx: &TransformKind) -> &dyn TransformTrait { TransformKind::Timeunit(tx) => tx, TransformKind::Joinaggregate(tx) => tx, TransformKind::Window(tx) => tx, + TransformKind::Project(tx) => tx, } } diff --git a/vegafusion-rt-datafusion/src/transform/project.rs b/vegafusion-rt-datafusion/src/transform/project.rs new file mode 100644 index 000000000..1aa6012ac --- /dev/null +++ b/vegafusion-rt-datafusion/src/transform/project.rs @@ -0,0 +1,55 @@ +/* + * VegaFusion + * Copyright (C) 2022 VegaFusion Technologies LLC + * + * This program is distributed under multiple licenses. + * Please consult the license documentation provided alongside + * this program the details of the active license. + */ +use crate::expression::compiler::config::CompilationConfig; +use crate::transform::TransformTrait; +use datafusion::dataframe::DataFrame; +use std::collections::HashSet; + +use std::sync::Arc; +use vegafusion_core::error::Result; +use vegafusion_core::proto::gen::transforms::Project; + +use async_trait::async_trait; +use vegafusion_core::task_graph::task_value::TaskValue; + +#[async_trait] +impl TransformTrait for Project { + async fn eval( + &self, + dataframe: Arc, + _config: &CompilationConfig, + ) -> Result<(Arc, Vec)> { + // Collect all dataframe fields into a HashSet for fast membership test + let all_fields: HashSet<_> = dataframe + .schema() + .fields() + .iter() + .map(|field| field.name().clone()) + .collect(); + + // Keep all of the project columns that are present in the dataframe. + // Skip projection fields that are not found + let select_fields: Vec<_> = self + .fields + .iter() + .filter_map(|field| { + if all_fields.contains(field) { + Some(field.clone()) + } else { + None + } + }) + .collect(); + + let select_field_strs: Vec<_> = select_fields.iter().map(|f| f.as_str()).collect(); + + let result = dataframe.select_columns(select_field_strs.as_slice())?; + Ok((result, Default::default())) + } +} diff --git a/vegafusion-rt-datafusion/tests/test_expression_parsing.rs b/vegafusion-rt-datafusion/tests/test_expression_parsing.rs index 7c2759d51..615e99b53 100644 --- a/vegafusion-rt-datafusion/tests/test_expression_parsing.rs +++ b/vegafusion-rt-datafusion/tests/test_expression_parsing.rs @@ -306,3 +306,102 @@ mod test_check_supported { #[test] fn test_marker() {} // Help IDE detect test module } + +mod test_column_usage { + use crate::*; + use vegafusion_core::expression::column_usage::{ + ColumnUsage, DatasetsColumnUsage, GetDatasetsColumnUsage, VlSelectionFields, + }; + use vegafusion_core::expression::parser::parse; + use vegafusion_core::proto::gen::tasks::Variable; + use vegafusion_core::task_graph::graph::ScopedVariable; + use vegafusion_core::task_graph::scope::TaskScope; + + #[rstest( + expr, + data_a_usage, + brush2_store_usage, + case("no_such_fn(23)", None, None), + case( + "isValid(datum[\"average_b\"]) && isFinite(+datum[\"average_b\"])", + Some(ColumnUsage::from(vec!["average_b"].as_slice())), + None, + ), + case( + "datum['one'] + datum.two", + Some(ColumnUsage::from(vec!["one", "two"].as_slice())), + None, + ), + case( + "datum.one + datum['two'] + datum['th' + 'ree']", + Some(ColumnUsage::Unknown), + None, + ), + case( + "vlSelectionTest(\"brush1_store\", datum)", + Some(ColumnUsage::Unknown), + None, + ), + case( + "vlSelectionTest(\"brush2_store\", datum)", + Some(ColumnUsage::from(vec!["AA", "BB", "CC"].as_slice())), + Some(ColumnUsage::Unknown), + ), + case( + "!length(data(\"brush2_store\")) || vlSelectionTest(\"brush2_store\", datum)", + Some(ColumnUsage::from(vec!["AA", "BB", "CC"].as_slice())), + Some(ColumnUsage::Unknown), + ), + case( + "data(\"brush2_store\")", + None, + Some(ColumnUsage::Unknown), + ), + )] + fn test( + expr: &str, + data_a_usage: Option, + brush2_store_usage: Option, + ) { + let expr = parse(expr).unwrap(); + + // Build expected usage + let data_a_var: ScopedVariable = (Variable::new_data("dataA"), Vec::new()); + let mut expected = DatasetsColumnUsage::empty(); + if let Some(data_a_usage) = data_a_usage { + expected = expected.with_column_usage(&data_a_var, data_a_usage); + } + if let Some(brush2_store_usage) = brush2_store_usage { + let brush2_store_var: ScopedVariable = (Variable::new_data("brush2_store"), Vec::new()); + expected = expected.with_column_usage(&brush2_store_var, brush2_store_usage); + } + + // Define selection dataset fields + let selection_fields: VlSelectionFields = vec![( + (Variable::new_data("brush2_store"), Vec::new()), + vec!["AA".to_string(), "BB".to_string(), "CC".to_string()], + )] + .into_iter() + .collect(); + + // Build dataset_column_usage args + let datum_var: ScopedVariable = (Variable::new_data("dataA"), Vec::new()); + let usage_scope = Vec::new(); + let mut task_scope = TaskScope::new(); + task_scope + .add_variable(&Variable::new_data("brush2_store"), &[]) + .unwrap(); + + // Compute dataset column usage + let usage = expr.datasets_column_usage( + &Some(datum_var), + &usage_scope, + &task_scope, + &selection_fields, + ); + assert_eq!(usage, expected); + } + + #[test] + fn test_marker() {} // Help IDE detect test module +} diff --git a/vegafusion-rt-datafusion/tests/test_image_comparison.rs b/vegafusion-rt-datafusion/tests/test_image_comparison.rs index aa7485d26..e46546c6b 100644 --- a/vegafusion-rt-datafusion/tests/test_image_comparison.rs +++ b/vegafusion-rt-datafusion/tests/test_image_comparison.rs @@ -265,7 +265,9 @@ mod test_vega_specs { // case("vega/parallel-coords", 0.001), // case("vega/splom-outer", 0.001), - case("vega/playfair", 0.001), + // // Named mark used as a dataset not supported + // case("vega/playfair", 0.001), + case("vega/population", 0.001), case("vega/quantile-dot-plot", 0.001), case("vega/regression", 0.001), diff --git a/vegafusion-rt-datafusion/tests/test_projection_pushdown.rs b/vegafusion-rt-datafusion/tests/test_projection_pushdown.rs new file mode 100644 index 000000000..0c2479718 --- /dev/null +++ b/vegafusion-rt-datafusion/tests/test_projection_pushdown.rs @@ -0,0 +1,47 @@ +#[cfg(test)] +mod test_custom_specs { + use crate::crate_dir; + use rstest::rstest; + use std::fs; + use vegafusion_core::planning::plan::{PlannerConfig, SpecPlan}; + use vegafusion_core::spec::chart::ChartSpec; + use vegafusion_core::spec::transform::TransformSpec; + + # [rstest( + spec_name, + data_index, + projection_fields, + case("vegalite/point_2d", 0, vec!["Horsepower", "Miles_per_Gallon"]), + case("vegalite/point_bubble", 0, vec!["Acceleration", "Horsepower", "Miles_per_Gallon"]), + )] + fn test(spec_name: &str, data_index: usize, projection_fields: Vec<&str>) { + // Load spec + let spec_path = format!("{}/tests/specs/{}.vg.json", crate_dir(), spec_name); + let spec_str = fs::read_to_string(spec_path).unwrap(); + let spec: ChartSpec = serde_json::from_str(&spec_str).unwrap(); + + let planner_config = PlannerConfig { + projection_pushdown: true, + ..Default::default() + }; + let spec_plan = SpecPlan::try_new(&spec, &planner_config).unwrap(); + let data = &spec_plan.server_spec.data[data_index]; + let tx = &data.transform[data.transform.len() - 1]; + + // Print data + // println!("{}", serde_json::to_string_pretty(&spec_plan.server_spec.data).unwrap()); + + if let TransformSpec::Project(project) = tx { + let expected_fields: Vec<_> = projection_fields.iter().map(|f| f.to_string()).collect(); + assert_eq!(project.fields, expected_fields); + } else { + panic!("Expected project transform") + } + } +} + +fn crate_dir() -> String { + std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .display() + .to_string() +} diff --git a/vegafusion-rt-datafusion/tests/test_transform_project.rs b/vegafusion-rt-datafusion/tests/test_transform_project.rs new file mode 100644 index 000000000..691eca0ad --- /dev/null +++ b/vegafusion-rt-datafusion/tests/test_transform_project.rs @@ -0,0 +1,50 @@ +/* + * VegaFusion + * Copyright (C) 2022 VegaFusion Technologies LLC + * + * This program is distributed under multiple licenses. + * Please consult the license documentation provided alongside + * this program the details of the active license. + */ +#[macro_use] +extern crate lazy_static; +mod util; + +#[cfg(test)] +mod test_project { + use crate::util::check::check_transform_evaluation; + use crate::util::datasets::vega_json_dataset; + use rstest::rstest; + use vegafusion_core::spec::transform::project::ProjectTransformSpec; + use vegafusion_core::spec::transform::TransformSpec; + + #[rstest( + fields, + case(vec!["Beak Length (mm)", "Species"]), + case(vec!["Species", "Beak Length (mm)"]), + case(vec!["Species", "Beak Length (mm)", "Bogus"]), + )] + fn test(fields: Vec<&str>) { + let dataset = vega_json_dataset("penguins"); + + let fields: Vec<_> = fields.iter().map(|s| s.to_string()).collect(); + let project_spec = ProjectTransformSpec { + fields, + extra: Default::default(), + }; + let transform_specs = vec![TransformSpec::Project(project_spec)]; + + let comp_config = Default::default(); + let eq_config = Default::default(); + + check_transform_evaluation( + &dataset, + transform_specs.as_slice(), + &comp_config, + &eq_config, + ); + } + + #[test] + fn test_marker() {} // Help IDE detect test module +}