Skip to content

Commit

Permalink
Revamp grpc support (#522)
Browse files Browse the repository at this point in the history
* Add grpc VegaFusionRuntimeTraim implementation, use from Python

* include inline datasets in query_request

* fix tests

* fix tests

* Add unimplemented stubs in wasm implementation

* fmt

* Move methods up to VegaFusionRuntimeTrait

* fmt/fix

* handle inline datasets in vegafusion-wasm

* warning / format

* fix requested indices

* fix python lint

* fix expected error message

* relock, mypy fixes

* bring back ignores for CI?
  • Loading branch information
jonmmease authored Oct 19, 2024
1 parent aa2817a commit 96ca282
Show file tree
Hide file tree
Showing 33 changed files with 5,480 additions and 6,963 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ lazy_static = { version = "1.5" }
async-trait = "0.1.73"
futures = "0.3.21"

[workspace.dependencies.serde_json]
version = "1.0.91"
default-features = false

[workspace.dependencies.datafusion]
version = "42.0.0"

Expand Down
9,337 changes: 3,769 additions & 5,568 deletions pixi.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ flaky = "3.7.0.*"
vega_datasets = "0.9.0.*"
jupytext = "1.15.0.*"
openjdk = "20.0.0.*"
build = "0.7.0.*"
minio-server = "2023.9.23.3.47.50.*"
minio = "7.1.17.*"
rust = "1.80.*"
Expand Down
2 changes: 1 addition & 1 deletion vegafusion-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ workspace = true
optional = true

[dependencies.serde_json]
version = "1.0.91"
workspace = true
default-features = false
optional = true

Expand Down
3 changes: 2 additions & 1 deletion vegafusion-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ chrono = "0.4.23"
num-complex = "0.4.2"
rand = "0.8.5"
json-patch = "1.0.0"
async-mutex = "1.4.0"

[dependencies.lazy_static]
workspace = true
Expand All @@ -44,7 +45,7 @@ workspace = true
optional = true

[dependencies.serde_json]
version = "1.0.91"
workspace = true
features = ["preserve_order"]

[dependencies.vegafusion-common]
Expand Down
24 changes: 4 additions & 20 deletions vegafusion-core/src/chart_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,10 @@ impl ChartState {

let mut init = Vec::new();
for response_value in response_task_values {
let variable = response_value
.variable
.with_context(|| "Missing variable for response value".to_string())?;
let variable = response_value.variable;

let scope = response_value.scope;
let proto_value = response_value
.value
.with_context(|| "Missing value for response value".to_string())?;

let value = TaskValue::try_from(&proto_value).with_context(|| {
"Deserialization failed for value of response value".to_string()
})?;
let value = response_value.value;

init.push(ExportUpdateArrow {
namespace: ExportUpdateNamespace::try_from(variable.ns()).unwrap(),
Expand Down Expand Up @@ -176,18 +168,10 @@ impl ChartState {
let mut response_updates = response_task_values
.into_iter()
.map(|response_value| {
let variable = response_value
.variable
.with_context(|| "Missing variable for response value".to_string())?;
let variable = response_value.variable;

let scope = response_value.scope;
let proto_value = response_value
.value
.with_context(|| "missing value for response value: {:?}".to_string())?;

let value = TaskValue::try_from(&proto_value).with_context(|| {
"Deserialization failed for value of response value: {:?}".to_string()
})?;
let value = response_value.value;

Ok(ExportUpdateJSON {
namespace: match variable.ns() {
Expand Down
38 changes: 21 additions & 17 deletions vegafusion-core/src/proto/pretransform.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ import "tasks.proto";
/// Pre transform spec messages
message PreTransformSpecOpts {
optional uint32 row_limit = 1;
repeated tasks.InlineDataset inline_datasets = 2;
bool preserve_interactivity = 3;
repeated PreTransformVariable keep_variables = 4;
bool preserve_interactivity = 2;
repeated PreTransformVariable keep_variables = 3;
string local_tz = 4;
optional string default_input_tz = 5;
}

message PreTransformSpecRequest {
string spec = 1;
string local_tz = 2;
optional string output_tz = 3;
PreTransformSpecOpts opts = 4;
repeated tasks.InlineDataset inline_datasets = 2;
PreTransformSpecOpts opts = 3;
}

message PreTransformSpecResponse {
Expand Down Expand Up @@ -50,15 +50,15 @@ message PreTransformVariable {

message PreTransformValuesOpts {
repeated PreTransformVariable variables = 1;
repeated tasks.InlineDataset inline_datasets = 2;
optional uint32 row_limit = 3;
optional uint32 row_limit = 2;
string local_tz = 3;
optional string default_input_tz = 4;
}

message PreTransformValuesRequest {
string spec = 1;
string local_tz = 2;
optional string default_input_tz = 3;
PreTransformValuesOpts opts = 4;
repeated tasks.InlineDataset inline_datasets = 2;
PreTransformValuesOpts opts = 3;
}

message PreTransformValuesResponse {
Expand Down Expand Up @@ -90,6 +90,14 @@ message PreTransformExtractDataset {
bytes table = 3;
}

message PreTransformExtractOpts {
string local_tz = 1;
optional string default_input_tz = 2;
bool preserve_interactivity = 3;
int32 extract_threshold = 4;
repeated PreTransformVariable keep_variables = 5;
}

message PreTransformExtractWarning {
oneof warning_type {
PlannerWarning planner = 1;
Expand All @@ -104,10 +112,6 @@ message PreTransformExtractResponse {

message PreTransformExtractRequest {
string spec = 1;
string local_tz = 2;
optional string default_input_tz = 3;
bool preserve_interactivity = 4;
int32 extract_threshold = 5;
repeated tasks.InlineDataset inline_datasets = 6;
repeated PreTransformVariable keep_variables = 7;
repeated tasks.InlineDataset inline_datasets = 2;
PreTransformExtractOpts opts = 3;
}
62 changes: 34 additions & 28 deletions vegafusion-core/src/proto/prost_gen/pretransform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@
pub struct PreTransformSpecOpts {
#[prost(uint32, optional, tag = "1")]
pub row_limit: ::core::option::Option<u32>,
#[prost(message, repeated, tag = "2")]
pub inline_datasets: ::prost::alloc::vec::Vec<super::tasks::InlineDataset>,
#[prost(bool, tag = "3")]
#[prost(bool, tag = "2")]
pub preserve_interactivity: bool,
#[prost(message, repeated, tag = "4")]
#[prost(message, repeated, tag = "3")]
pub keep_variables: ::prost::alloc::vec::Vec<PreTransformVariable>,
#[prost(string, tag = "4")]
pub local_tz: ::prost::alloc::string::String,
#[prost(string, optional, tag = "5")]
pub default_input_tz: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PreTransformSpecRequest {
#[prost(string, tag = "1")]
pub spec: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub local_tz: ::prost::alloc::string::String,
#[prost(string, optional, tag = "3")]
pub output_tz: ::core::option::Option<::prost::alloc::string::String>,
#[prost(message, optional, tag = "4")]
#[prost(message, repeated, tag = "2")]
pub inline_datasets: ::prost::alloc::vec::Vec<super::tasks::InlineDataset>,
#[prost(message, optional, tag = "3")]
pub opts: ::core::option::Option<PreTransformSpecOpts>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -82,21 +82,21 @@ pub struct PreTransformVariable {
pub struct PreTransformValuesOpts {
#[prost(message, repeated, tag = "1")]
pub variables: ::prost::alloc::vec::Vec<PreTransformVariable>,
#[prost(message, repeated, tag = "2")]
pub inline_datasets: ::prost::alloc::vec::Vec<super::tasks::InlineDataset>,
#[prost(uint32, optional, tag = "3")]
#[prost(uint32, optional, tag = "2")]
pub row_limit: ::core::option::Option<u32>,
#[prost(string, tag = "3")]
pub local_tz: ::prost::alloc::string::String,
#[prost(string, optional, tag = "4")]
pub default_input_tz: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PreTransformValuesRequest {
#[prost(string, tag = "1")]
pub spec: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub local_tz: ::prost::alloc::string::String,
#[prost(string, optional, tag = "3")]
pub default_input_tz: ::core::option::Option<::prost::alloc::string::String>,
#[prost(message, optional, tag = "4")]
#[prost(message, repeated, tag = "2")]
pub inline_datasets: ::prost::alloc::vec::Vec<super::tasks::InlineDataset>,
#[prost(message, optional, tag = "3")]
pub opts: ::core::option::Option<PreTransformValuesOpts>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -147,6 +147,20 @@ pub struct PreTransformExtractDataset {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PreTransformExtractOpts {
#[prost(string, tag = "1")]
pub local_tz: ::prost::alloc::string::String,
#[prost(string, optional, tag = "2")]
pub default_input_tz: ::core::option::Option<::prost::alloc::string::String>,
#[prost(bool, tag = "3")]
pub preserve_interactivity: bool,
#[prost(int32, tag = "4")]
pub extract_threshold: i32,
#[prost(message, repeated, tag = "5")]
pub keep_variables: ::prost::alloc::vec::Vec<PreTransformVariable>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PreTransformExtractWarning {
#[prost(oneof = "pre_transform_extract_warning::WarningType", tags = "1")]
pub warning_type: ::core::option::Option<pre_transform_extract_warning::WarningType>,
Expand Down Expand Up @@ -175,16 +189,8 @@ pub struct PreTransformExtractResponse {
pub struct PreTransformExtractRequest {
#[prost(string, tag = "1")]
pub spec: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub local_tz: ::prost::alloc::string::String,
#[prost(string, optional, tag = "3")]
pub default_input_tz: ::core::option::Option<::prost::alloc::string::String>,
#[prost(bool, tag = "4")]
pub preserve_interactivity: bool,
#[prost(int32, tag = "5")]
pub extract_threshold: i32,
#[prost(message, repeated, tag = "6")]
#[prost(message, repeated, tag = "2")]
pub inline_datasets: ::prost::alloc::vec::Vec<super::tasks::InlineDataset>,
#[prost(message, repeated, tag = "7")]
pub keep_variables: ::prost::alloc::vec::Vec<PreTransformVariable>,
#[prost(message, optional, tag = "3")]
pub opts: ::core::option::Option<PreTransformExtractOpts>,
}
62 changes: 34 additions & 28 deletions vegafusion-core/src/proto/tonic_gen/pretransform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@
pub struct PreTransformSpecOpts {
#[prost(uint32, optional, tag = "1")]
pub row_limit: ::core::option::Option<u32>,
#[prost(message, repeated, tag = "2")]
pub inline_datasets: ::prost::alloc::vec::Vec<super::tasks::InlineDataset>,
#[prost(bool, tag = "3")]
#[prost(bool, tag = "2")]
pub preserve_interactivity: bool,
#[prost(message, repeated, tag = "4")]
#[prost(message, repeated, tag = "3")]
pub keep_variables: ::prost::alloc::vec::Vec<PreTransformVariable>,
#[prost(string, tag = "4")]
pub local_tz: ::prost::alloc::string::String,
#[prost(string, optional, tag = "5")]
pub default_input_tz: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PreTransformSpecRequest {
#[prost(string, tag = "1")]
pub spec: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub local_tz: ::prost::alloc::string::String,
#[prost(string, optional, tag = "3")]
pub output_tz: ::core::option::Option<::prost::alloc::string::String>,
#[prost(message, optional, tag = "4")]
#[prost(message, repeated, tag = "2")]
pub inline_datasets: ::prost::alloc::vec::Vec<super::tasks::InlineDataset>,
#[prost(message, optional, tag = "3")]
pub opts: ::core::option::Option<PreTransformSpecOpts>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -82,21 +82,21 @@ pub struct PreTransformVariable {
pub struct PreTransformValuesOpts {
#[prost(message, repeated, tag = "1")]
pub variables: ::prost::alloc::vec::Vec<PreTransformVariable>,
#[prost(message, repeated, tag = "2")]
pub inline_datasets: ::prost::alloc::vec::Vec<super::tasks::InlineDataset>,
#[prost(uint32, optional, tag = "3")]
#[prost(uint32, optional, tag = "2")]
pub row_limit: ::core::option::Option<u32>,
#[prost(string, tag = "3")]
pub local_tz: ::prost::alloc::string::String,
#[prost(string, optional, tag = "4")]
pub default_input_tz: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PreTransformValuesRequest {
#[prost(string, tag = "1")]
pub spec: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub local_tz: ::prost::alloc::string::String,
#[prost(string, optional, tag = "3")]
pub default_input_tz: ::core::option::Option<::prost::alloc::string::String>,
#[prost(message, optional, tag = "4")]
#[prost(message, repeated, tag = "2")]
pub inline_datasets: ::prost::alloc::vec::Vec<super::tasks::InlineDataset>,
#[prost(message, optional, tag = "3")]
pub opts: ::core::option::Option<PreTransformValuesOpts>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -147,6 +147,20 @@ pub struct PreTransformExtractDataset {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PreTransformExtractOpts {
#[prost(string, tag = "1")]
pub local_tz: ::prost::alloc::string::String,
#[prost(string, optional, tag = "2")]
pub default_input_tz: ::core::option::Option<::prost::alloc::string::String>,
#[prost(bool, tag = "3")]
pub preserve_interactivity: bool,
#[prost(int32, tag = "4")]
pub extract_threshold: i32,
#[prost(message, repeated, tag = "5")]
pub keep_variables: ::prost::alloc::vec::Vec<PreTransformVariable>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct PreTransformExtractWarning {
#[prost(oneof = "pre_transform_extract_warning::WarningType", tags = "1")]
pub warning_type: ::core::option::Option<pre_transform_extract_warning::WarningType>,
Expand Down Expand Up @@ -175,16 +189,8 @@ pub struct PreTransformExtractResponse {
pub struct PreTransformExtractRequest {
#[prost(string, tag = "1")]
pub spec: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub local_tz: ::prost::alloc::string::String,
#[prost(string, optional, tag = "3")]
pub default_input_tz: ::core::option::Option<::prost::alloc::string::String>,
#[prost(bool, tag = "4")]
pub preserve_interactivity: bool,
#[prost(int32, tag = "5")]
pub extract_threshold: i32,
#[prost(message, repeated, tag = "6")]
#[prost(message, repeated, tag = "2")]
pub inline_datasets: ::prost::alloc::vec::Vec<super::tasks::InlineDataset>,
#[prost(message, repeated, tag = "7")]
pub keep_variables: ::prost::alloc::vec::Vec<PreTransformVariable>,
#[prost(message, optional, tag = "3")]
pub opts: ::core::option::Option<PreTransformExtractOpts>,
}
Loading

0 comments on commit 96ca282

Please sign in to comment.