Skip to content

Commit

Permalink
Add new Derived stream class (#4532)
Browse files Browse the repository at this point in the history
* Add new Derived stream class

that generalizes the concept of having a stream that produces values based on one or more other streams.
Update SelectionExpr stream to be a Derived stream subclass.

* Fix selection tests

* trigger_index constant

* Remove trigger_index, replace with exclusive setting in Derived

* Stream param must be constant
  • Loading branch information
jonmmease authored Aug 10, 2020
1 parent b5b9b05 commit f075725
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 78 deletions.
4 changes: 2 additions & 2 deletions holoviews/selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def _build_selection_streams(cls, inst):
"""
raise NotImplementedError()

def _expr_stream_updated(self, hvobj, selection_expr, bbox, region_element):
def _expr_stream_updated(self, hvobj, selection_expr, bbox, region_element, **kwargs):
"""
Called when one of the registered HoloViews objects produces a new
selection expression. Subclasses should override this method, and
Expand Down Expand Up @@ -291,7 +291,7 @@ def selected_cmap(self):
"""
return None if self.selected_color is None else _color_to_cmap(self.selected_color)

def _expr_stream_updated(self, hvobj, selection_expr, bbox, region_element):
def _expr_stream_updated(self, hvobj, selection_expr, bbox, region_element, **kwargs):
if selection_expr:
if self.cross_filter_mode == "overwrite":
# clear other regions and selections
Expand Down
177 changes: 137 additions & 40 deletions holoviews/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,9 +780,82 @@ def __init__(self, parameterized, parameters=None, watch=True, **params):
super(ParamMethod, self).__init__(parameterized, parameters, watch, **params)


class Derived(Stream):
"""
A Stream that watches the parameters of one or more input streams and produces
a result that is a pure function of the input stream values.
If exclusive=True, then all streams except the most recently updated are cleared.
"""
def __init__(self, input_streams, exclusive=False, **params):
super(Derived, self).__init__(**params)
self.input_streams = input_streams
self.exclusive = exclusive
self._register_input_streams()
self.update()

def _register_input_streams(self):
"""
Register callbacks to watch for changes to input streams
"""
for i, stream in enumerate(self.input_streams):
def perform_update(stream_index=i, **kwargs):
# If exlcusive, reset other stream values before triggering event
if self.exclusive:
for j, input_stream in enumerate(self.input_streams):
if stream_index != j:
input_stream.reset()
self.event()
stream.add_subscriber(perform_update)

def _unregister_input_streams(self):
"""
Unregister callbacks on input streams and clear input streams list
"""
for stream in self.input_streams:
stream.source = None
stream.clear()
self.input_streams.clear()

class SelectionExpr(Stream):
@property
def constants(self):
"""
Dict of constants for this instance that should be passed to transform_function
Constant values must not change in response to changes in the values of the
input streams. They may, however, change in response to other stream property
updates. For example, these values may change if the Stream's source element
changes
"""
return {}

def transform(self):
stream_values = [s.contents for s in self.input_streams]
return self.transform_function(stream_values, self.constants)

@classmethod
def transform_function(cls, stream_values, constants):
"""
Pure function that transforms input stream param values into the param values
of this Derived stream.
Args:
stream_values: list of dict
Current values of the stream params for each input_stream
constants: dict
Constants as returned by the constants property of an instance of this
stream type.
Returns: dict
dict of new Stream values where the keys match this stream's params
"""
raise NotImplementedError

def __del__(self):
self._unregister_input_streams()


class SelectionExpr(Derived):
selection_expr = param.Parameter(default=None, constant=True)

bbox = param.Dict(default=None, constant=True)
Expand All @@ -799,69 +872,93 @@ def __init__(self, source, **params):
if isinstance(source, DynamicMap):
initialize_dynamic(source)

if ((isinstance(source, DynamicMap) and issubclass(source.type, Element)) or
isinstance(source, Element)):
self._source_streams = []
super(SelectionExpr, self).__init__(source=source, **params)
self._register_chart(source)
else:
if not ((isinstance(source, DynamicMap) and issubclass(source.type, Element))
or isinstance(source, Element)):
raise ValueError(
"The source of SelectionExpr must be an instance of an "
"Element subclass or a DynamicMap that returns such an "
"instance. Received value of type {typ}: {val}".format(
typ=type(source), val=source)
)

def _register_chart(self, hvobj):
from .core.spaces import DynamicMap
input_streams = self._build_selection_streams(source)
super(SelectionExpr, self).__init__(
source=source, input_streams=input_streams, exclusive=True, **params
)

if isinstance(hvobj, DynamicMap):
element_type = hvobj.type
def _build_selection_streams(self, source):
from holoviews.core.spaces import DynamicMap
if isinstance(source, DynamicMap):
element_type = source.type
else:
element_type = hvobj
element_type = source
input_streams = [
stream(source=source) for stream in element_type._selection_streams
]
return input_streams

selection_streams = element_type._selection_streams
@property
def constants(self):
return {
"source": self.source,
"index_cols": self._index_cols
}

def _set_expr(**params):
if isinstance(hvobj, DynamicMap):
element = hvobj.values()[-1]
else:
element = hvobj
params = dict(params, index_cols=self._index_cols)
@classmethod
def transform_function(cls, stream_values, constants):
from holoviews.core.spaces import DynamicMap

hvobj = constants["source"]
if isinstance(hvobj, DynamicMap):
element = hvobj.values()[-1]
else:
element = hvobj

selection_expr = None
bbox = None
region_element = None
for stream_value in stream_values:
params = dict(stream_value, index_cols=constants["index_cols"])
selection_expr, bbox, region_element = \
element._get_selection_expr_for_stream_value(**params)
for expr_transform in element._transforms[::-1]:
if selection_expr is not None:
selection_expr = expr_transform(selection_expr)

self.event(
selection_expr=selection_expr,
bbox=bbox,
region_element=region_element,
)
if selection_expr is not None:
break

for stream_type in selection_streams:
stream = stream_type(source=hvobj)
self._source_streams.append(stream)
stream.add_subscriber(_set_expr)
for expr_transform in element._transforms[::-1]:
if selection_expr is not None:
selection_expr = expr_transform(selection_expr)

def _unregister_chart(self):
for stream in self._source_streams:
stream.source = None
stream.clear()
self._source_streams.clear()
return dict(
selection_expr=selection_expr,
bbox=bbox,
region_element=region_element,
)

@property
def source(self):
return Stream.source.fget(self)

@source.setter
def source(self, value):
self._unregister_chart()
# Unregister old selection streams
self._unregister_input_streams()

# Set new source
Stream.source.fset(self, value)

def __del__(self):
self._unregister_chart()
# Build selection input streams for new source element
self.input_streams = self._build_selection_streams(self.source)

# Clear current selection expression state
self.update(
selection_expr=None,
bbox=None,
region_element=None,
)

# Register callbacks on input streams
self._register_input_streams()


class LinkedStream(Stream):
"""
Expand Down
18 changes: 9 additions & 9 deletions holoviews/tests/testselection.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def test_points_selection(self, dynamic=False, show_regions=True):
self.check_overlay_points_like(selected, lnk_sel, self.data)

# Perform selection of second and third point
boundsxy = lnk_sel._selection_expr_streams[0]._source_streams[0]
boundsxy = lnk_sel._selection_expr_streams[0].input_streams[0]
self.assertIsInstance(boundsxy, hv.streams.SelectionXY)
boundsxy.event(bounds=(0, 1, 5, 5))
unselected, selected, region, region2 = linked[()].values()
Expand Down Expand Up @@ -136,7 +136,7 @@ def test_layout_selection_points_table(self):
)

# Select first and third point
boundsxy = lnk_sel._selection_expr_streams[0]._source_streams[0]
boundsxy = lnk_sel._selection_expr_streams[0].input_streams[0]
boundsxy.event(bounds=(0, 0, 4, 2))
current_obj = linked[()]

Expand Down Expand Up @@ -181,7 +181,7 @@ def test_overlay_points_errorbars(self, dynamic=False):
self.check_overlay_points_like(current_obj.ErrorBars.II, lnk_sel, self.data)

# Select first and third point
boundsxy = lnk_sel._selection_expr_streams[0]._source_streams[0]
boundsxy = lnk_sel._selection_expr_streams[0].input_streams[0]
boundsxy.event(bounds=(0, 0, 4, 2))

current_obj = linked[()]
Expand Down Expand Up @@ -228,7 +228,7 @@ def test_datashade_selection(self):
)

# Perform selection of second and third point
boundsxy = lnk_sel._selection_expr_streams[0]._source_streams[0]
boundsxy = lnk_sel._selection_expr_streams[0].input_streams[0]
self.assertIsInstance(boundsxy, SelectionXY)
boundsxy.event(bounds=(0, 1, 5, 5))
current_obj = linked[()]
Expand Down Expand Up @@ -265,7 +265,7 @@ def test_points_selection_streaming(self):
linked = lnk_sel(points)

# Perform selection of first and (future) third point
boundsxy = lnk_sel._selection_expr_streams[0]._source_streams[0]
boundsxy = lnk_sel._selection_expr_streams[0].input_streams[0]
self.assertIsInstance(boundsxy, hv.streams.SelectionXY)
boundsxy.event(bounds=(0, 0, 4, 2))
current_obj = linked[()]
Expand Down Expand Up @@ -350,7 +350,7 @@ def do_crossfilter_points_histogram(
self.assertEqual(region_hist.data, [None, None])

# (1) Perform selection on points of points [1, 2]
points_boundsxy = lnk_sel._selection_expr_streams[0]._source_streams[0]
points_boundsxy = lnk_sel._selection_expr_streams[0].input_streams[0]
self.assertIsInstance(points_boundsxy, SelectionXY)
points_boundsxy.event(bounds=(1, 1, 4, 4))

Expand Down Expand Up @@ -378,7 +378,7 @@ def do_crossfilter_points_histogram(
)

# (2) Perform selection on histogram bars [0, 1]
hist_boundsxy = lnk_sel._selection_expr_streams[1]._source_streams[0]
hist_boundsxy = lnk_sel._selection_expr_streams[1].input_streams[0]
self.assertIsInstance(hist_boundsxy, SelectionXY)
hist_boundsxy.event(bounds=(0, 0, 2.5, 2))

Expand Down Expand Up @@ -414,7 +414,7 @@ def do_crossfilter_points_histogram(
)

# (3) Perform selection on points points [0, 2]
points_boundsxy = lnk_sel._selection_expr_streams[0]._source_streams[0]
points_boundsxy = lnk_sel._selection_expr_streams[0].input_streams[0]
self.assertIsInstance(points_boundsxy, SelectionXY)
points_boundsxy.event(bounds=(0, 0, 4, 2.5))

Expand All @@ -440,7 +440,7 @@ def do_crossfilter_points_histogram(
self.assertEqual(region_hist.data, [0, 2.5])

# (4) Perform selection of bars [1, 2]
hist_boundsxy = lnk_sel._selection_expr_streams[1]._source_streams[0]
hist_boundsxy = lnk_sel._selection_expr_streams[1].input_streams[0]
self.assertIsInstance(hist_boundsxy, SelectionXY)
hist_boundsxy.event(bounds=(1.5, 0, 3.5, 2))

Expand Down
Loading

0 comments on commit f075725

Please sign in to comment.