Skip to content

Commit

Permalink
Add initial impute transform support (#147)
Browse files Browse the repository at this point in the history
* Add impute JSON specification
* Add impute protobuf specification
* Add initial impute transform implementation
* Update new expected comm plans for test specs that use supported form of the impute transform
  • Loading branch information
jonmmease authored Jul 28, 2022
1 parent fc3a2a6 commit a3000c2
Show file tree
Hide file tree
Showing 21 changed files with 544 additions and 18 deletions.
27 changes: 26 additions & 1 deletion vegafusion-core/src/proto/prost_gen/transforms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,24 @@ pub struct Stack {
#[prost(string, optional, tag="7")]
pub alias_1: ::core::option::Option<::prost::alloc::string::String>,
}
/// Impute
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Impute {
#[prost(string, tag="1")]
pub field: ::prost::alloc::string::String,
#[prost(string, tag="2")]
pub key: ::prost::alloc::string::String,
#[prost(enumeration="ImputeMethod", tag="3")]
pub method: i32,
#[prost(string, repeated, tag="4")]
pub groupby: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
#[prost(string, optional, tag="5")]
pub value_json: ::core::option::Option<::prost::alloc::string::String>,
}
/// Top-level transform
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Transform {
#[prost(oneof="transform::TransformKind", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11")]
#[prost(oneof="transform::TransformKind", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12")]
pub transform_kind: ::core::option::Option<transform::TransformKind>,
}
/// Nested message and enum types in `Transform`.
Expand Down Expand Up @@ -209,6 +223,8 @@ pub mod transform {
Project(super::Project),
#[prost(message, tag="11")]
Stack(super::Stack),
#[prost(message, tag="12")]
Impute(super::Impute),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -295,3 +311,12 @@ pub enum StackOffset {
Center = 1,
Normalize = 2,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum ImputeMethod {
ImputeValue = 0,
ImputeMean = 1,
ImputeMedian = 2,
ImputeMax = 3,
ImputeMin = 4,
}
27 changes: 26 additions & 1 deletion vegafusion-core/src/proto/tonic_gen/transforms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,24 @@ pub struct Stack {
#[prost(string, optional, tag="7")]
pub alias_1: ::core::option::Option<::prost::alloc::string::String>,
}
/// Impute
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Impute {
#[prost(string, tag="1")]
pub field: ::prost::alloc::string::String,
#[prost(string, tag="2")]
pub key: ::prost::alloc::string::String,
#[prost(enumeration="ImputeMethod", tag="3")]
pub method: i32,
#[prost(string, repeated, tag="4")]
pub groupby: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
#[prost(string, optional, tag="5")]
pub value_json: ::core::option::Option<::prost::alloc::string::String>,
}
/// Top-level transform
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Transform {
#[prost(oneof="transform::TransformKind", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11")]
#[prost(oneof="transform::TransformKind", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12")]
pub transform_kind: ::core::option::Option<transform::TransformKind>,
}
/// Nested message and enum types in `Transform`.
Expand Down Expand Up @@ -209,6 +223,8 @@ pub mod transform {
Project(super::Project),
#[prost(message, tag="11")]
Stack(super::Stack),
#[prost(message, tag="12")]
Impute(super::Impute),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -295,3 +311,12 @@ pub enum StackOffset {
Center = 1,
Normalize = 2,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum ImputeMethod {
ImputeValue = 0,
ImputeMean = 1,
ImputeMedian = 2,
ImputeMax = 3,
ImputeMin = 4,
}
18 changes: 18 additions & 0 deletions vegafusion-core/src/proto/transforms.proto
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,23 @@ enum StackOffset {
Normalize = 2;
}

// Impute
message Impute {
string field = 1;
string key = 2;
ImputeMethod method = 3;
repeated string groupby = 4;
optional string value_json = 5;
}

enum ImputeMethod {
ImputeValue = 0;
ImputeMean = 1;
ImputeMedian = 2;
ImputeMax = 3;
ImputeMin = 4;
}

// Top-level transform
message Transform {
oneof transform_kind {
Expand All @@ -220,6 +237,7 @@ message Transform {
Window window = 9;
Project project = 10;
Stack stack = 11;
Impute impute = 12;
}
}

Expand Down
90 changes: 90 additions & 0 deletions vegafusion-core/src/spec/transform/impute.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use crate::expression::column_usage::{ColumnUsage, DatasetsColumnUsage, VlSelectionFields};
use crate::spec::transform::{TransformColumns, TransformSpecTrait};
use crate::spec::values::Field;
use crate::task_graph::graph::ScopedVariable;
use crate::task_graph::scope::TaskScope;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ImputeTransformSpec {
pub field: Field,
pub key: Field,

#[serde(skip_serializing_if = "Option::is_none")]
pub keyvals: Option<Value>,

#[serde(skip_serializing_if = "Option::is_none")]
pub method: Option<ImputeMethodSpec>,

#[serde(skip_serializing_if = "Option::is_none")]
pub groupby: Option<Vec<Field>>,

#[serde(skip_serializing_if = "Option::is_none")]
pub value: Option<Value>,

#[serde(flatten)]
pub extra: HashMap<String, Value>,
}

impl ImputeTransformSpec {
pub fn method(&self) -> ImputeMethodSpec {
self.method.clone().unwrap_or(ImputeMethodSpec::Value)
}

pub fn groupby(&self) -> Vec<Field> {
self.groupby.clone().unwrap_or_default()
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ImputeMethodSpec {
Value,
Mean,
Median,
Max,
Min,
}

impl TransformSpecTrait for ImputeTransformSpec {
fn supported(&self) -> bool {
self.field.as_().is_none()
&& self.key.as_().is_none()
&& self.keyvals.is_none()
&& self.method() == ImputeMethodSpec::Value
&& self.groupby().len() <= 1
&& self.value.is_some()
}

fn transform_columns(
&self,
datum_var: &Option<ScopedVariable>,
_usage_scope: &[u32],
_task_scope: &TaskScope,
_vl_selection_fields: &VlSelectionFields,
) -> TransformColumns {
if let Some(datum_var) = datum_var {
// Init column usage with field
let mut col_usage = ColumnUsage::from(self.field.field().as_str());

// Add key
col_usage = col_usage.with_column(self.key.field().as_str());

// Add groupby usage
if let Some(groupby) = self.groupby.as_ref() {
let groupby: Vec<_> = groupby.iter().map(|field| field.field()).collect();
col_usage = col_usage.union(&ColumnUsage::from(groupby.as_slice()));
}

// Build produced (no columns are created by impute transform)
let produced = ColumnUsage::empty();

let usage = DatasetsColumnUsage::empty().with_column_usage(datum_var, col_usage);
TransformColumns::PassThrough { usage, produced }
} else {
TransformColumns::Unknown
}
}
}
8 changes: 5 additions & 3 deletions vegafusion-core/src/spec/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod collect;
pub mod extent;
pub mod filter;
pub mod formula;
pub mod impute;
pub mod joinaggregate;
pub mod lookup;
pub mod project;
Expand All @@ -29,6 +30,7 @@ use crate::spec::transform::aggregate::AggregateTransformSpec;
use crate::spec::transform::bin::BinTransformSpec;
use crate::spec::transform::collect::CollectTransformSpec;
use crate::spec::transform::formula::FormulaTransformSpec;
use crate::spec::transform::impute::ImputeTransformSpec;
use crate::spec::transform::joinaggregate::JoinAggregateTransformSpec;
use crate::spec::transform::lookup::LookupTransformSpec;
use crate::spec::transform::project::ProjectTransformSpec;
Expand Down Expand Up @@ -56,7 +58,8 @@ pub enum TransformSpec {
JoinAggregate(JoinAggregateTransformSpec),
Window(WindowTransformSpec),
Project(ProjectTransformSpec),
/**/ Stack(StackTransformSpec),
Stack(StackTransformSpec),
Impute(ImputeTransformSpec),

// Unsupported
CountPattern(CountpatternTransformSpec),
Expand All @@ -75,7 +78,6 @@ pub enum TransformSpec {
Graticule(GraticuleTransformSpec),
Heatmap(HeatmapTransformSpec),
Identifier(IdentifierTransformSpec),
Impute(ImputeTransformSpec),
IsoContour(IsocontourTransformSpec),
Kde(KdeTransformSpec),
Kde2d(Kde2dTransformSpec),
Expand Down Expand Up @@ -115,6 +117,7 @@ impl Deref for TransformSpec {
TransformSpec::Timeunit(t) => t,
TransformSpec::Project(t) => t,
TransformSpec::Stack(t) => t,
TransformSpec::Impute(t) => t,

// Supported for dependency determination, not implementation
TransformSpec::Lookup(t) => t,
Expand All @@ -137,7 +140,6 @@ impl Deref for TransformSpec {
TransformSpec::Graticule(t) => t,
TransformSpec::Heatmap(t) => t,
TransformSpec::Identifier(t) => t,
TransformSpec::Impute(t) => t,
TransformSpec::IsoContour(t) => t,
TransformSpec::JoinAggregate(t) => t,
TransformSpec::Kde(t) => t,
Expand Down
1 change: 0 additions & 1 deletion vegafusion-core/src/spec/transform/unsupported.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ unsupported_transforms!(
GraticuleTransformSpec,
HeatmapTransformSpec,
IdentifierTransformSpec,
ImputeTransformSpec,
IsocontourTransformSpec,
KdeTransformSpec,
Kde2dTransformSpec,
Expand Down
50 changes: 50 additions & 0 deletions vegafusion-core/src/transform/impute.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use crate::error::Result;
use crate::proto::gen::transforms::{Impute, ImputeMethod};
use crate::spec::transform::impute::{ImputeMethodSpec, ImputeTransformSpec};
use crate::transform::TransformDependencies;

impl Impute {
pub fn try_new(spec: &ImputeTransformSpec) -> Result<Self> {
// Extract method
let method = match spec.method() {
ImputeMethodSpec::Value => ImputeMethod::ImputeValue,
ImputeMethodSpec::Mean => ImputeMethod::ImputeMean,
ImputeMethodSpec::Median => ImputeMethod::ImputeMedian,
ImputeMethodSpec::Max => ImputeMethod::ImputeMax,
ImputeMethodSpec::Min => ImputeMethod::ImputeMin,
};

// Extract field
let field = spec.field.field();

// Extract key
let key = spec.key.field();

// Extract groupby
let groupby: Vec<_> = spec
.groupby
.clone()
.unwrap_or_default()
.iter()
.map(|field| field.field())
.collect();

// Extract Value
let value_json = spec
.value
.as_ref()
.map(|value| serde_json::to_string(value).unwrap());

// keyvals not yet supported

Ok(Impute {
field,
key,
method: method as i32,
groupby,
value_json,
})
}
}

impl TransformDependencies for Impute {}
5 changes: 4 additions & 1 deletion vegafusion-core/src/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::error::VegaFusionError;
use crate::proto::gen::tasks::Variable;
use crate::proto::gen::transforms::transform::TransformKind;
use crate::proto::gen::transforms::{
Aggregate, Bin, Collect, Extent, Filter, Formula, Project, Stack, TimeUnit,
Aggregate, Bin, Collect, Extent, Filter, Formula, Impute, Project, Stack, TimeUnit,
};
use crate::proto::gen::transforms::{JoinAggregate, Transform, Window};
use crate::spec::transform::TransformSpec;
Expand All @@ -23,6 +23,7 @@ pub mod collect;
pub mod extent;
pub mod filter;
pub mod formula;
pub mod impute;
pub mod joinaggregate;
pub mod pipeline;
pub mod project;
Expand All @@ -48,6 +49,7 @@ impl TryFrom<&TransformSpec> for TransformKind {
TransformSpec::Window(tx_spec) => Self::Window(Window::try_new(tx_spec)?),
TransformSpec::Project(tx_spec) => Self::Project(Project::try_new(tx_spec)?),
TransformSpec::Stack(tx_spec) => Self::Stack(Stack::try_new(tx_spec)?),
TransformSpec::Impute(tx_spec) => Self::Impute(Impute::try_new(tx_spec)?),
_ => {
return Err(VegaFusionError::parse(&format!(
"Unsupported transform: {:?}",
Expand Down Expand Up @@ -82,6 +84,7 @@ impl TransformKind {
TransformKind::Window(tx) => tx,
TransformKind::Project(tx) => tx,
TransformKind::Stack(tx) => tx,
TransformKind::Impute(tx) => tx,
}
}
}
Expand Down
Loading

0 comments on commit a3000c2

Please sign in to comment.