Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EP-3555 add missing processes for callbacks #153

Merged
merged 9 commits into from
Sep 17, 2020
27 changes: 0 additions & 27 deletions docs/basics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -157,30 +157,3 @@ It can easily be converted into a pandas dataframe:

The same method also works for multiple polygons, or GeoJSON or SHP files that are
accessible by the backend. This allows computing aggregated values over very large areas.

Some openEO process allow you to specify a process to be invoked on a subset of the datacube. This library allows this by specifying
a 'callback' function.


.. _callbackfunctions:

Callback functions
------------------

A callback function is created by defining an actual Python function, or a lambda expression. This function is then passed on
to processes such as :py:meth:`openeo.rest.datacube.DataCube.apply`, :py:meth:`openeo.rest.datacube.DataCube.apply_dimension`, :py:meth:`openeo.rest.datacube.DataCube.apply_neighborhood`,
and :py:meth:`openeo.rest.datacube.DataCube.reduce_dimension`.

This is an example:

.. code-block:: python

datacube_absolute = cube.apply(process=lambda data:absolute(data))

For more complex operations, you can define a function:

.. code-block:: python

def callback(data):
return absolute(data)

198 changes: 198 additions & 0 deletions docs/processes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,201 @@ The parameter listing of the example above could be written like this::
]



.. _callbackfunctions:

Processes with "callbacks"
==========================

Some openEO processes expect some kind of sub-process
to be invoked on a subset or slice of the datacube.
For example:

* process ``apply`` requires a transformation that will be applied
to each pixel in the cube (separately)
* process ``reduce_dimension`` requires an aggregation function to convert
an array of pixel values (along a given dimension) to a single value
* process ``apply_neighborhood`` requires a function to transform a small
"neighborhood" cube to another

These transformation functions are usually called "**callbacks**"
because instead of being called explicitly by the user,
they are called by their "parent" process
(the ``apply``, ``reduce_dimension`` and ``apply_neighborhood`` in the examples)


The openEO Python Client Library currently provides a couple of functions
that expect a callback, including:
:py:meth:`openeo.rest.datacube.DataCube.apply`,
:py:meth:`openeo.rest.datacube.DataCube.apply_dimension`,
:py:meth:`openeo.rest.datacube.DataCube.apply_neighborhood`,
:py:meth:`openeo.rest.datacube.DataCube.merge_cubes`,
:py:meth:`openeo.rest.datacube.DataCube.reduce_dimension`,
and :py:meth:`openeo.rest.datacube.DataCube.load_collection`.
These functions support several ways to specify the desired callback.


Callback as string
------------------

The easiest way is passing a process name as a string,
for example:

.. code-block:: python

# Take the absolute value of each pixel
cube.apply("absolute")

# Reduce a cube along the temporal dimension by taking the maximum value
cube.reduce_dimension("max", dimension="t")

This approach is only possible if the desired transformation is available
as a single process. If not, use one of the methods below.

Also important is that the "signature" of the provided callback process
should correspond properly with what the parent process expects.
For example: ``apply`` requires a callback process that receives a
number and returns one (like ``absolute`` or ``sqrt``),
while ``reduce_dimension`` requires a callback process that receives
an array of numbers and returns a single number (like ``max`` or ``mean``).


Callback as a callable
-----------------------

You can also specify the callback as a "callable":
a Python object that can be called (e.g. a function without parenthesis).

The openEO Python Client Library defines the
official processes in the :py:mod:`openeo.process.processes` module,
which can be used directly:

.. code-block:: python

from openeo.processes import absolute, max

cube.apply(absolute)
cube.reduce_dimension(max, dimension="t")

You can also use ``lambda`` functions:

.. code-block:: python

cube.apply(lambda x: x * 2 + 3)


or normal Python functions:

.. code-block:: python

from openeo.processes import array_element

def my_bandmath(data):
band1 = array_element(data, index=1)
band1 = array_element(data, index=1)
return band1 + 1.2 * band 2


cube.reduce_dimension(my_bandmath, dimension="bands")


The argument that is passed to these functions is
an instance of :py:class:`openeo.processes.ProcessBuilder`.
This is a helper object with predefined methods for all standard processes,
allowing to use an object oriented coding style to define the callback.
For example:

.. code-block:: python

from openeo.processes import ProcessBuilder

def avg(data: ProcessBuilder):
return data.mean()

cube.reduce_dimension(avg, dimension="t")


These methods also return ``ProcessBuilder`` objects,
which also allows writing callbacks in chained fashion:

.. code-block:: python

cube.apply(lambda x: x.absolute().cos().add(y=1.23))


All this gives a lot of flexibility to define callbacks compactly
in a desired coding style.
The following examples result in the same callback:

.. code-block:: python

from openeo.processes import ProcessBuilder, mean, cos, add

# Chained methods
cube.reduce_dimension(
lambda data: data.mean().cos().add(y=1.23),
dimension="t"
)

# Functions
cube.reduce_dimension(
lambda data: add(x=cos(mean(data)), y=1.23),
dimension="t"
)

# Mixing methods, functions and operators
cube.reduce_dimension(
lambda data: cos(data.mean())) + 1.23,
dimension="t"
)


Caveats
````````

Specifying callbacks through Python functions (or lambdas)
looks intuitive and straightforward, but it should be noted
that not everything is allowed in these functions.
You should just limit yourself to calling
:py:mod:`openeo.process.processes` functions, :py:class:`openeo.processes.ProcessBuilder` methods and basic math operators.
Don't call functions from other libraries like numpy or scipy.
Don't use Python control flow statements like ``if/else`` constructs
or ``for`` loops.

The reason for this is that the openEO Python Client Library
does not translate the function source code itself
to an openEO process graph.
Instead, when building the openEO process graph,
it passes a special object to the function
and keeps track of which :py:mod:`openeo.process.processes` functions
were called to assemble the corresponding process graph.
If you use control flow statements or use numpy functions for example,
this procedure will incorrectly detect what you want to do in the callback.


Callback as ``PGNode``
-----------------------

You can also pass a ``PGNode`` object as callback.
This method is used internally and could be useful for more
advanced use cases, but it requires more in-depth knowledge of
the openEO API and openEO Python Client Library to construct correctly.
Some examples:

.. code-block:: python

from openeo.internal.graph_building import PGNode

cube.apply(PGNode(
"add",
x=PGNode(
"cos",
x=PGNode("absolute", x={"from_parameter": "x"})
),
y=1.23
))

cube.reduce_dimension(
reducer=PGNode("max", data={"from_parameter": "data"}),
dimension="bands"
)
8 changes: 5 additions & 3 deletions openeo/internal/graph_building.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

class PGNode:
"""
Generic node in a process graph.
Wrapper for process node in a process graph (has process_id and arguments).

While this is a simple, thin container, it allows a bit more abstraction, basic encapsulation,
type hinting and code intelligence in your IDE than something generic like a dict.
Expand Down Expand Up @@ -71,14 +71,15 @@ def flatten(self):
return GraphFlattener().flatten(node=self)

@staticmethod
def to_process_graph_argument(value: Union['PGNode', str, dict]):
def to_process_graph_argument(value: Union['PGNode', str, dict]) -> dict:
"""
Normalize given argument properly to a "process_graph" argument
to be used as reducer/subprocess for processes like
'reduce_dimension', 'aggregate_spatial', 'apply', 'merge_cubes', 'resample_cube_temporal'
"""
if isinstance(value, str):
# assume string with predefined reduce/apply process ("mean", "sum", ...)
# TODO: is this case still used? It's invalid anyway for 1.0 openEO spec I think?
return value
elif isinstance(value, PGNode):
return {"process_graph": value}
Expand Down Expand Up @@ -109,12 +110,13 @@ def __init__(self, code:str,runtime:str,data=None,version:str = None,context:Dic

super().__init__(process_id='run_udf', arguments=arguments)


class ReduceNode(PGNode):
"""
A process graph node for "reduce" processes (has a reducer sub-process-graph)
"""

def __init__(self, data: PGNode, reducer: Union[PGNode, str], dimension: str, process_id="reduce_dimension",
def __init__(self, data: PGNode, reducer: Union[PGNode, str, dict], dimension: str, process_id="reduce_dimension",
band_math_mode: bool = False):
assert process_id in ("reduce_dimension", "reduce_dimension_binary")
arguments = {
Expand Down
Empty file.
34 changes: 34 additions & 0 deletions openeo/internal/processes/builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import Union

from openeo.internal.graph_building import PGNode

UNSET = object()


class ProcessBuilderBase:
"""
Base implementation of a builder pattern that allows constructing process graphs
by calling functions.
"""

# TODO: can this implementation be merged with PGNode directly?

def __init__(self, pgnode: Union[PGNode, dict, list]):
self.pgnode = pgnode

@classmethod
def process(cls, process_id: str, arguments: dict = None, **kwargs):
"""
Apply process, using given arguments

:param process_id: process id of the process.
:param arguments: argument dictionary for the process.
:return: new ProcessBuilder instance
"""
arguments = {**(arguments or {}), **kwargs}
for arg, value in arguments.items():
if isinstance(value, ProcessBuilderBase):
arguments[arg] = value.pgnode
for arg in [a for a, v in arguments.items() if v is UNSET]:
del arguments[arg]
return cls(PGNode(process_id=process_id, arguments=arguments))
Loading