Skip to content

Commit

Permalink
Add VegaFusion data transformer with mime renderer, save, and to_dict…
Browse files Browse the repository at this point in the history
…/to_json integration (#3094)

* Add vegafusion data transformer

* Replace vegafusion data transformer in transformed_data with Altair's

* Add vegafusion mimebundle test

* Use vegafusion+dataset:// protocol

* Use VEGA_VERSION to build schema

* Document reason for local imports

* Perform pre-transform in to_dict/to_json

Raise a ValueError when the "vegafusion" transformer is enabled and format="vega-lite".

Use context={"pre_transform": False} to disable pre_transforming when "vegafusion" is enabled, for internal usage.

* Handle composite charts
  • Loading branch information
jonmmease authored Jul 8, 2023
1 parent 2734301 commit ae8d57b
Show file tree
Hide file tree
Showing 10 changed files with 343 additions and 31 deletions.
9 changes: 5 additions & 4 deletions altair/utils/_transformed_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
ConcatChart,
data_transformers,
)
from altair.utils._vegafusion_data import get_inline_tables
from altair.utils.core import _DataFrameLike
from altair.utils.schemapi import Undefined

Expand Down Expand Up @@ -57,7 +58,7 @@ def transformed_data(chart, row_limit=None, exclude=None):
transformed data
"""
try:
from vegafusion import runtime, get_local_tz, get_inline_datasets_for_spec # type: ignore
from vegafusion import runtime, get_local_tz # type: ignore
except ImportError as err:
raise ImportError(
"transformed_data requires the vegafusion-python-embed and vegafusion packages\n"
Expand All @@ -80,9 +81,9 @@ def transformed_data(chart, row_limit=None, exclude=None):
chart_names = name_views(chart, 0, exclude=exclude)

# Compile to Vega and extract inline DataFrames
with data_transformers.enable("vegafusion-inline"):
vega_spec = chart.to_dict(format="vega")
inline_datasets = get_inline_datasets_for_spec(vega_spec)
with data_transformers.enable("vegafusion"):
vega_spec = chart.to_dict(format="vega", context={"pre_transform": False})
inline_datasets = get_inline_tables(vega_spec)

# Build mapping from mark names to vega datasets
facet_mapping = get_facet_mapping(vega_spec)
Expand Down
201 changes: 201 additions & 0 deletions altair/utils/_vegafusion_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import sys
from toolz import curried
import uuid
from weakref import WeakValueDictionary

from typing import Union, Dict, Set, MutableMapping

if sys.version_info >= (3, 8):
from typing import TypedDict, Final
else:
from typing_extensions import TypedDict, Final

from altair.utils.core import _DataFrameLike
from altair.utils.data import _DataType, _ToValuesReturnType, MaxRowsError
from altair.vegalite.data import default_data_transformer

# Temporary storage for dataframes that have been extracted
# from charts by the vegafusion data transformer. Use a WeakValueDictionary
# rather than a dict so that the Python interpreter is free to garbage
# collect the stored DataFrames.
extracted_inline_tables: MutableMapping[str, _DataFrameLike] = WeakValueDictionary()

# Special URL prefix that VegaFusion uses to denote that a
# dataset in a Vega spec corresponds to an entry in the `inline_datasets`
# kwarg of vf.runtime.pre_transform_spec().
VEGAFUSION_PREFIX: Final = "vegafusion+dataset://"


class _ToVegaFusionReturnUrlDict(TypedDict):
url: str


@curried.curry
def vegafusion_data_transformer(
data: _DataType, max_rows: int = 100000
) -> Union[_ToVegaFusionReturnUrlDict, _ToValuesReturnType]:
"""VegaFusion Data Transformer"""
if hasattr(data, "__geo_interface__"):
# Use default transformer for geo interface objects
# # (e.g. a geopandas GeoDataFrame)
return default_data_transformer(data)
elif hasattr(data, "__dataframe__"):
table_name = f"table_{uuid.uuid4()}".replace("-", "_")
extracted_inline_tables[table_name] = data
return {"url": VEGAFUSION_PREFIX + table_name}
else:
# Use default transformer if we don't recognize data type
return default_data_transformer(data)


def get_inline_table_names(vega_spec: dict) -> Set[str]:
"""Get a set of the inline datasets names in the provided Vega spec
Inline datasets are encoded as URLs that start with the table://
prefix.
Parameters
----------
vega_spec: dict
A Vega specification dict
Returns
-------
set of str
Set of the names of the inline datasets that are referenced
in the specification.
Examples
--------
>>> spec = {
... "data": [
... {
... "name": "foo",
... "url": "https://path/to/file.csv"
... },
... {
... "name": "bar",
... "url": "table://inline_dataset_123"
... }
... ]
... }
>>> get_inline_table_names(spec)
{'inline_dataset_123'}
"""
table_names = set()

# Process datasets
for data in vega_spec.get("data", []):
url = data.get("url", "")
if url.startswith(VEGAFUSION_PREFIX):
name = url[len(VEGAFUSION_PREFIX) :]
table_names.add(name)

# Recursively process child marks, which may have their own datasets
for mark in vega_spec.get("marks", []):
table_names.update(get_inline_table_names(mark))

return table_names


def get_inline_tables(vega_spec: dict) -> Dict[str, _DataFrameLike]:
"""Get the inline tables referenced by a Vega specification
Note: This function should only be called on a Vega spec that corresponds
to a chart that was processed by the vegafusion_data_transformer.
Furthermore, this function may only be called once per spec because
the returned dataframes are deleted from internal storage.
Parameters
----------
vega_spec: dict
A Vega specification dict
Returns
-------
dict from str to dataframe
dict from inline dataset name to dataframe object
"""
table_names = get_inline_table_names(vega_spec)
tables = {}
for table_name in table_names:
try:
tables[table_name] = extracted_inline_tables.pop(table_name)
except KeyError:
# named dataset that was provided by the user
pass
return tables


def compile_with_vegafusion(vegalite_spec: dict) -> dict:
"""Compile a Vega-Lite spec to Vega and pre-transform with VegaFusion
Note: This function should only be called on a Vega-Lite spec
that was generated with the "vegafusion" data transformer enabled.
In particular, this spec may contain references to extract datasets
using table:// prefixed URLs.
Parameters
----------
vegalite_spec: dict
A Vega-Lite spec that was generated from an Altair chart with
the "vegafusion" data transformer enabled
Returns
-------
dict
A Vega spec that has been pre-transformed by VegaFusion
"""
# Local import to avoid circular ImportError
from altair import vegalite_compilers, data_transformers

try:
import vegafusion as vf # type: ignore
except ImportError as e:
raise ImportError(
'The "vegafusion" data transformer requires the vegafusion-python-embed\n'
"and vegafusion packages. These can be installed with pip using:\n"
' pip install "vegafusion[embed]"\n'
"Or with conda using:\n"
" conda install -c conda-forge vegafusion-python-embed vegafusion"
) from e

# Compile Vega-Lite spec to Vega
compiler = vegalite_compilers.get()
if compiler is None:
raise ValueError("No active vega-lite compiler plugin found")

vega_spec = compiler(vegalite_spec)

# Retrieve dict of inline tables referenced by the spec
inline_tables = get_inline_tables(vega_spec)

# Pre-evaluate transforms in vega spec with vegafusion
row_limit = data_transformers.options.get("max_rows", None)
transformed_vega_spec, warnings = vf.runtime.pre_transform_spec(
vega_spec,
vf.get_local_tz(),
inline_datasets=inline_tables,
row_limit=row_limit,
)

# Check from row limit warning and convert to MaxRowsError
for warning in warnings:
if warning.get("type") == "RowLimitExceeded":
raise MaxRowsError(
"The number of dataset rows after filtering and aggregation exceeds\n"
f"the current limit of {row_limit}. Try adding an aggregation to reduce\n"
"the size of the dataset that must be loaded into the browser. Or, disable\n"
"the limit by calling alt.data_transformers.disable_max_rows(). Note that\n"
"disabling this limit may cause the browser to freeze or crash."
)

return transformed_vega_spec


def using_vegafusion() -> bool:
"""Check whether the vegafusion data transfomer is enabled"""
# Local import to avoid circular ImportError
from altair import data_transformers

return data_transformers.active == "vegafusion"
12 changes: 12 additions & 0 deletions altair/utils/display.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Callable, Dict, Optional, Tuple, Any, Union
import uuid

from ._vegafusion_data import compile_with_vegafusion, using_vegafusion
from .plugin_registry import PluginRegistry, PluginEnabler
from .mimebundle import spec_to_mimebundle
from .schemapi import validate_jsonschema
Expand Down Expand Up @@ -161,10 +162,21 @@ def default_renderer_base(
This renderer works with modern frontends (JupyterLab, nteract) that know
how to render the custom VegaLite MIME type listed above.
"""
# Local import to avoid circular ImportError
from altair.vegalite.v5.display import VEGA_MIME_TYPE, VEGALITE_MIME_TYPE

assert isinstance(spec, dict)
bundle: Dict[str, Union[str, dict]] = {}
metadata: Dict[str, Dict[str, Any]] = {}

if using_vegafusion():
spec = compile_with_vegafusion(spec)

# Swap mimetype from Vega-Lite to Vega.
# If mimetype was JSON, leave it alone
if mime_type == VEGALITE_MIME_TYPE:
mime_type = VEGA_MIME_TYPE

bundle[mime_type] = spec
bundle["text/plain"] = str_repr
if options:
Expand Down
35 changes: 27 additions & 8 deletions altair/utils/mimebundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,16 @@ def spec_to_mimebundle(
----
The png, svg, pdf, and vega outputs require the altair_saver package
"""
# Local import to avoid circular ImportError
from altair.utils.display import compile_with_vegafusion, using_vegafusion

if mode != "vega-lite":
raise ValueError("mode must be 'vega-lite'")

if using_vegafusion():
spec = compile_with_vegafusion(spec)
mode = "vega"

if format in ["png", "svg", "pdf", "vega"]:
return _spec_to_mimebundle_with_engine(
spec, format, mode, engine=engine, **kwargs
Expand Down Expand Up @@ -82,7 +89,7 @@ def _spec_to_mimebundle_with_engine(spec, format, mode, **kwargs):
a dictionary representing a vega-lite plot spec
format : string {'png', 'svg', 'pdf', 'vega'}
the format of the mimebundle to be returned
mode : string {'vega-lite'}
mode : string {'vega-lite', 'vega'}
The rendering mode.
engine: string {'vl-convert', 'altair_saver'}
the conversion engine to use
Expand All @@ -102,17 +109,29 @@ def _spec_to_mimebundle_with_engine(spec, format, mode, **kwargs):
# from SCHEMA_VERSION (of the form 'v5.2.0')
vl_version = "_".join(SCHEMA_VERSION.split(".")[:2])
if format == "vega":
vg = vlc.vegalite_to_vega(spec, vl_version=vl_version)
if mode == "vega":
vg = spec
else:
vg = vlc.vegalite_to_vega(spec, vl_version=vl_version)
return {"application/vnd.vega.v5+json": vg}
elif format == "svg":
svg = vlc.vegalite_to_svg(spec, vl_version=vl_version)
if mode == "vega":
svg = vlc.vega_to_svg(spec)
else:
svg = vlc.vegalite_to_svg(spec, vl_version=vl_version)
return {"image/svg+xml": svg}
elif format == "png":
png = vlc.vegalite_to_png(
spec,
vl_version=vl_version,
scale=kwargs.get("scale_factor", 1.0),
)
if mode == "vega":
png = vlc.vega_to_png(
spec,
scale=kwargs.get("scale_factor", 1),
)
else:
png = vlc.vegalite_to_png(
spec,
vl_version=vl_version,
scale=kwargs.get("scale_factor", 1),
)
return {"image/png": png}
else:
# This should be validated above
Expand Down
33 changes: 23 additions & 10 deletions altair/utils/save.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from .mimebundle import spec_to_mimebundle
from ..vegalite.v5.data import data_transformers
from altair.utils._vegafusion_data import using_vegafusion


def write_file_or_filename(fp, content, mode="w", encoding=None):
Expand Down Expand Up @@ -122,15 +123,12 @@ def save(

format = set_inspect_format_argument(format, fp, inline)

# Temporarily turn off any data transformers so that all data is inlined
# when calling chart.to_dict. This is relevant for vl-convert which cannot access
# local json files which could be created by a json data transformer. Furthermore,
# we don't exit the with statement until this function completed due to the issue
# described at https://github.com/vega/vl-convert/issues/31
with data_transformers.enable("default"), data_transformers.disable_max_rows():
spec = chart.to_dict()
def perform_save():
spec = chart.to_dict(context={"pre_transform": False})

mode = set_inspect_mode_argument(mode, embed_options, spec, vegalite_version)
inner_mode = set_inspect_mode_argument(
mode, embed_options, spec, vegalite_version
)

if format == "json":
json_spec = json.dumps(spec, **json_kwds)
Expand All @@ -141,7 +139,7 @@ def save(
mimebundle = spec_to_mimebundle(
spec=spec,
format=format,
mode=mode,
mode=inner_mode,
vega_version=vega_version,
vegalite_version=vegalite_version,
vegaembed_version=vegaembed_version,
Expand All @@ -154,7 +152,7 @@ def save(
mimebundle = spec_to_mimebundle(
spec=spec,
format=format,
mode=mode,
mode=inner_mode,
vega_version=vega_version,
vegalite_version=vegalite_version,
vegaembed_version=vegaembed_version,
Expand All @@ -174,3 +172,18 @@ def save(
)
else:
raise ValueError("Unsupported format: '{}'".format(format))

if using_vegafusion():
# When the vegafusion data transformer is enabled, transforms will be
# evaluated during save and the resulting data will be included in the
# vega specification that is saved.
with data_transformers.disable_max_rows():
perform_save()
else:
# Temporarily turn off any data transformers so that all data is inlined
# when calling chart.to_dict. This is relevant for vl-convert which cannot access
# local json files which could be created by a json data transformer. Furthermore,
# we don't exit the with statement until this function completed due to the issue
# described at https://github.com/vega/vl-convert/issues/31
with data_transformers.enable("default"), data_transformers.disable_max_rows():
perform_save()
2 changes: 1 addition & 1 deletion altair/vegalite/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class DataTransformerRegistry(_DataTransformerRegistry):
def disable_max_rows(self) -> PluginEnabler:
"""Disable the MaxRowsError."""
options = self.options
if self.active == "default":
if self.active in ("default", "vegafusion"):
options = options.copy()
options["max_rows"] = None
return self.enable(**options)
Expand Down
Loading

0 comments on commit ae8d57b

Please sign in to comment.