From ec982c2c79ca697946e0178626cf09eae70efc47 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Thu, 10 Oct 2024 22:12:34 +0200 Subject: [PATCH 01/22] Issue #604 initial PoC implementation of UDPJobFactory --- openeo/extra/job_management.py | 109 +++++++++++++++++++++++++- openeo/rest/_testing.py | 2 + openeo/rest/connection.py | 2 +- tests/extra/test_job_management.py | 122 +++++++++++++++++++++++++++++ 4 files changed, 233 insertions(+), 2 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index eb6dd86db..131c35632 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -1,14 +1,17 @@ import abc import contextlib import datetime +import functools import json import logging +import re import time import warnings from pathlib import Path from threading import Thread from typing import Callable, Dict, List, NamedTuple, Optional, Union +import numpy import pandas as pd import requests import shapely.errors @@ -17,7 +20,7 @@ from openeo import BatchJob, Connection from openeo.rest import OpenEoApiError -from openeo.util import deep_get, rfc3339 +from openeo.util import deep_get, repr_truncate, rfc3339 _log = logging.getLogger(__name__) @@ -886,3 +889,107 @@ def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str = else: raise NotImplementedError(f"Initialization of {type(job_db)} is not supported.") return job_db + + +class UDPJobFactory: + """ + Batch job factory based on a parameterized process definition + (e.g a user-defined process (UDP) or a remote process definitions), + to be used together with :py:class:`MultiBackendJobManager`. + """ + + def __init__( + self, process_id: str, *, namespace: Union[str, None] = None, parameter_defaults: Optional[dict] = None + ): + self._process_id = process_id + self._namespace = namespace + self._parameter_defaults = parameter_defaults or {} + + def _get_process_definition(self, connection: Connection) -> dict: + if isinstance(self._namespace, str) and re.match("https?://", self._namespace): + return self._get_remote_process_definition() + elif self._namespace is None: + return connection.user_defined_process(self._process_id).describe() + else: + raise NotImplementedError( + f"Unsupported process definition source udp_id={self._process_id!r} namespace={self._namespace!r}" + ) + + @functools.lru_cache() + def _get_remote_process_definition(self) -> dict: + """ + Get process definition based on "Remote Process Definition Extension" spec + https://github.com/Open-EO/openeo-api/tree/draft/extensions/remote-process-definition + """ + assert isinstance(self._namespace, str) and re.match("https?://", self._namespace) + resp = requests.get(url=self._namespace) + resp.raise_for_status() + data = resp.json() + if isinstance(data, list): + # Handle process listing: filter out right process + processes = [p for p in data if p.get("id") == self._process_id] + if len(processes) != 1: + raise ValueError(f"Process {self._process_id!r} not found at {self._namespace}") + (data,) = processes + + # Check for required fields of a process definition + if isinstance(data, dict) and "id" in data and "process_graph" in data: + process_definition = data + else: + raise ValueError(f"Invalid process definition at {self._namespace}") + + return process_definition + + def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: + """ + Implementation of the `start_job` callable interface for MultiBackendJobManager: + Create and start a job based on given dataframe row + + :param row: The row in the pandas dataframe that stores the jobs state and other tracked data. + :param connection: The connection to the backend. + + :return: The started job. + """ + + process_definition = self._get_process_definition(connection=connection) + parameters = process_definition.get("parameters", []) + arguments = {} + for parameter in parameters: + name = parameter["name"] + schema = parameter.get("schema", {}) + if name in row.index: + # Higherst priority: value from dataframe row + value = row[name] + elif name in self._parameter_defaults: + # Fallback on default values from constructor + value = self._parameter_defaults[name] + else: + if parameter.get("optional", False): + continue + raise ValueError(f"Missing required parameter {name!r} for process {self._process_id!r}") + + # TODO: validation or normalization based on schema? + # Some pandas/numpy data types need a bit of conversion for JSON encoding + if isinstance(value, numpy.integer): + value = int(value) + elif isinstance(value, numpy.number): + value = float(value) + + arguments[name] = value + + cube = connection.datacube_from_process(process_id=self._process_id, namespace=self._namespace, **arguments) + + title = row.get("title", f"UDP {self._process_id!r} with {repr_truncate(parameters)}") + description = row.get( + "description", f"UDP {self._process_id!r} (namespace {self._namespace}) with {parameters}" + ) + job = connection.create_job(cube, title=title, description=description) + + return job + + def __call__(self, *arg, **kwargs) -> BatchJob: + """Syntactic sugar for calling `start_job` directly.""" + return self.start_job(*arg, **kwargs) + + + diff --git a/openeo/rest/_testing.py b/openeo/rest/_testing.py index b44002687..6764a984a 100644 --- a/openeo/rest/_testing.py +++ b/openeo/rest/_testing.py @@ -5,6 +5,8 @@ from openeo import Connection, DataCube from openeo.rest.vectorcube import VectorCube +OPENEO_BACKEND = "https://openeo.test/" + class OpeneoTestingException(Exception): pass diff --git a/openeo/rest/connection.py b/openeo/rest/connection.py index 25ba304ac..65126cf9a 100644 --- a/openeo/rest/connection.py +++ b/openeo/rest/connection.py @@ -1732,7 +1732,7 @@ def execute( def create_job( self, - process_graph: Union[dict, str, Path], + process_graph: Union[dict, str, Path, FlatGraphableMixin], *, title: Optional[str] = None, description: Optional[str] = None, diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 87a483652..9e3b5e784 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -31,14 +31,26 @@ ParquetJobDatabase, create_job_db, get_job_db, + UDPJobFactory, ) +from openeo.rest._testing import OPENEO_BACKEND, DummyBackend, build_capabilities from openeo.util import rfc3339 +@pytest.fixture +def con120(requests_mock) -> openeo.Connection: + requests_mock.get(OPENEO_BACKEND, json=build_capabilities(api_version="1.2.0")) + con = openeo.Connection(OPENEO_BACKEND) + return con + + class FakeBackend: """ Fake openEO backend with some basic job management functionality for testing job manager logic. """ + + # TODO: replace/merge with openeo.rest._testing.DummyBackend + def __init__(self, *, backend_root_url: str = "http://openeo.test", requests_mock): self.url = backend_root_url.rstrip("/") requests_mock.get(f"{self.url}/", json={"api_version": "1.1.0"}) @@ -957,3 +969,113 @@ def test_create_job_db(tmp_path, filename, expected): db = create_job_db(path=path, df=df) assert isinstance(db, expected) assert path.exists() + + +class TestUDPJobFactory: + @pytest.fixture + def dummy_backend(self, requests_mock, con120) -> DummyBackend: + return DummyBackend(requests_mock=requests_mock, connection=con120) + + @pytest.fixture(autouse=True) + def remote_process_definitions(self, requests_mock): + requests_mock.get( + "https://remote.test/3plus5.json", + json={ + "id": "3plus5", + "process_graph": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}, + }, + ) + requests_mock.get( + "https://remote.test/increment.json", + json={ + "id": "increment", + "parameters": [ + {"name": "data", "schema": {"type": "number"}}, + {"name": "increment", "schema": {"type": "number"}, "optional": True, "default": 1}, + ], + "process_graph": { + "process_id": "add", + "arguments": {"x": {"from_parameter": "data"}, "y": {"from_parameter": "increment"}}, + "result": True, + }, + }, + ) + + def test_minimal(self, con120, dummy_backend): + """Bare minimum: just start a job, no parameters/arguments""" + job_factory = UDPJobFactory(process_id="3plus5", namespace="https://remote.test/3plus5.json") + + job = job_factory.start_job(row=pd.Series({"foo": 123}), connection=con120) + assert isinstance(job, BatchJob) + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "3plus51": { + "process_id": "3plus5", + "namespace": "https://remote.test/3plus5.json", + "arguments": {}, + "result": True, + } + }, + "status": "created", + } + } + + def test_basic(self, con120, dummy_backend): + """Basic parameterized UDP job generation""" + job_factory = UDPJobFactory(process_id="increment", namespace="https://remote.test/increment.json") + + job = job_factory.start_job(row=pd.Series({"data": 123}), connection=con120) + assert isinstance(job, BatchJob) + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": {"data": 123}, + "result": True, + } + }, + "status": "created", + } + } + + @pytest.mark.parametrize( + ["parameter_defaults", "row", "expected_arguments"], + [ + (None, {"data": 123}, {"data": 123}), + (None, {"data": 123, "increment": 5}, {"data": 123, "increment": 5}), + ({"increment": 5}, {"data": 123}, {"data": 123, "increment": 5}), + ({"increment": 5}, {"data": 123, "increment": 1000}, {"data": 123, "increment": 1000}), + ], + ) + def test_basic_parameterization(self, con120, dummy_backend, parameter_defaults, row, expected_arguments): + """Basic parameterized UDP job generation""" + job_factory = UDPJobFactory( + process_id="increment", + namespace="https://remote.test/increment.json", + parameter_defaults=parameter_defaults, + ) + + job = job_factory.start_job(row=pd.Series(row), connection=con120) + assert isinstance(job, BatchJob) + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": expected_arguments, + "result": True, + } + }, + "status": "created", + } + } + + + From 4e0435115f1d6d504b7d6aef05599597e70701e8 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Fri, 11 Oct 2024 12:51:14 +0200 Subject: [PATCH 02/22] Issue #604/#644 refactor out parse_remote_process_definition as standalone utility Finetune openeo.internal.processes.parse to properly support this (e.g. leverage named tuples for immutability, less boilerplate and out of the box equality checks) --- openeo/extra/job_management.py | 41 +++------ openeo/internal/processes/parse.py | 83 ++++++++++++------ tests/extra/test_job_management.py | 10 ++- tests/internal/processes/test_parse.py | 112 ++++++++++++++++++++++++- 4 files changed, 187 insertions(+), 59 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 131c35632..d10be9ac6 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -19,6 +19,7 @@ from requests.adapters import HTTPAdapter, Retry from openeo import BatchJob, Connection +from openeo.internal.processes.parse import Process, parse_remote_process_definition from openeo.rest import OpenEoApiError from openeo.util import deep_get, repr_truncate, rfc3339 @@ -905,40 +906,22 @@ def __init__( self._namespace = namespace self._parameter_defaults = parameter_defaults or {} - def _get_process_definition(self, connection: Connection) -> dict: + def _get_process_definition(self, connection: Connection) -> Process: if isinstance(self._namespace, str) and re.match("https?://", self._namespace): + # Remote process definition handling return self._get_remote_process_definition() elif self._namespace is None: - return connection.user_defined_process(self._process_id).describe() + # Handling of a user-specific UDP + udp_raw = connection.user_defined_process(self._process_id).describe() + return Process.from_dict(udp_raw) else: raise NotImplementedError( f"Unsupported process definition source udp_id={self._process_id!r} namespace={self._namespace!r}" ) @functools.lru_cache() - def _get_remote_process_definition(self) -> dict: - """ - Get process definition based on "Remote Process Definition Extension" spec - https://github.com/Open-EO/openeo-api/tree/draft/extensions/remote-process-definition - """ - assert isinstance(self._namespace, str) and re.match("https?://", self._namespace) - resp = requests.get(url=self._namespace) - resp.raise_for_status() - data = resp.json() - if isinstance(data, list): - # Handle process listing: filter out right process - processes = [p for p in data if p.get("id") == self._process_id] - if len(processes) != 1: - raise ValueError(f"Process {self._process_id!r} not found at {self._namespace}") - (data,) = processes - - # Check for required fields of a process definition - if isinstance(data, dict) and "id" in data and "process_graph" in data: - process_definition = data - else: - raise ValueError(f"Invalid process definition at {self._namespace}") - - return process_definition + def _get_remote_process_definition(self) -> Process: + return parse_remote_process_definition(namespace=self._namespace, process_id=self._process_id) def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: """ @@ -952,11 +935,11 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: """ process_definition = self._get_process_definition(connection=connection) - parameters = process_definition.get("parameters", []) + parameters = process_definition.parameters or [] arguments = {} for parameter in parameters: - name = parameter["name"] - schema = parameter.get("schema", {}) + name = parameter.name + schema = parameter.schema if name in row.index: # Higherst priority: value from dataframe row value = row[name] @@ -964,7 +947,7 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: # Fallback on default values from constructor value = self._parameter_defaults[name] else: - if parameter.get("optional", False): + if parameter.optional: continue raise ValueError(f"Missing required parameter {name!r} for process {self._process_id!r}") diff --git a/openeo/internal/processes/parse.py b/openeo/internal/processes/parse.py index afb97dfdb..2c3942f39 100644 --- a/openeo/internal/processes/parse.py +++ b/openeo/internal/processes/parse.py @@ -6,18 +6,18 @@ from __future__ import annotations import json +import re import typing from pathlib import Path -from typing import Iterator, List, Union +from typing import Any, Iterator, List, Optional, Union import requests -class Schema: +class Schema(typing.NamedTuple): """Schema description of an openEO process parameter or return value.""" - def __init__(self, schema: Union[dict, list]): - self.schema = schema + schema: Union[dict, list] @classmethod def from_dict(cls, data: dict) -> Schema: @@ -32,19 +32,18 @@ def is_process_graph(self) -> bool: ) -class Parameter: - """openEO process parameter""" +_NO_DEFAULT = object() + +class Parameter(typing.NamedTuple): + """openEO process parameter""" # TODO unify with openeo.api.process.Parameter? - NO_DEFAULT = object() - - def __init__(self, name: str, description: str, schema: Schema, default=NO_DEFAULT, optional: bool = False): - self.name = name - self.description = description - self.schema = schema - self.default = default - self.optional = optional + name: str + description: str + schema: Schema + default: Any = _NO_DEFAULT + optional: bool = False @classmethod def from_dict(cls, data: dict) -> Parameter: @@ -52,12 +51,12 @@ def from_dict(cls, data: dict) -> Parameter: name=data["name"], description=data["description"], schema=Schema.from_dict(data["schema"]), - default=data.get("default", cls.NO_DEFAULT), + default=data.get("default", _NO_DEFAULT), optional=data.get("optional", False), ) def has_default(self): - return self.default is not self.NO_DEFAULT + return self.default is not _NO_DEFAULT class Returns: @@ -73,13 +72,20 @@ def from_dict(cls, data: dict) -> Returns: class Process(typing.NamedTuple): - """An openEO process""" - + """ + Container for a opneEO process definition of an openEO process, + covering pre-defined processes, user-defined processes, + remote process definitions, etc. + """ + + # Common-denominator-wise only the process id is a required field in a process definition. + # Depending on the context in the openEO API, some other fields (e.g. "process_graph") + # may also be required. id: str - parameters: List[Parameter] - returns: Returns - description: str = "" - summary: str = "" + parameters: Optional[List[Parameter]] = None + returns: Optional[Returns] = None + description: Optional[str] = None + summary: Optional[str] = None # TODO: more properties? @classmethod @@ -87,10 +93,10 @@ def from_dict(cls, data: dict) -> Process: """Construct openEO process from dictionary values""" return cls( id=data["id"], - parameters=[Parameter.from_dict(d) for d in data["parameters"]], - returns=Returns.from_dict(data["returns"]), - description=data["description"], - summary=data["summary"], + parameters=[Parameter.from_dict(d) for d in data["parameters"]] if "parameters" in data else None, + returns=Returns.from_dict(data["returns"]) if "returns" in data else None, + description=data.get("description"), + summary=data.get("summary"), ) @classmethod @@ -114,3 +120,28 @@ def parse_all_from_dir(path: Union[str, Path], pattern="*.json") -> Iterator[Pro """Parse all openEO process files in given directory""" for p in sorted(Path(path).glob(pattern)): yield Process.from_json_file(p) + + +def parse_remote_process_definition(namespace: str, process_id: Optional[str] = None) -> Process: + """ + Parse a process definition as defined by the "Remote Process Definition Extension" spec + https://github.com/Open-EO/openeo-api/tree/draft/extensions/remote-process-definition + """ + if not re.match("https?://", namespace): + raise ValueError(f"Expected absolute URL, but got {namespace!r}") + + resp = requests.get(url=namespace) + resp.raise_for_status() + data = resp.json() + assert isinstance(data, dict) + + if "id" not in data and "processes" in data and isinstance(data["processes"], list): + # Handle process listing: filter out right process + if not isinstance(process_id, str): + raise ValueError(f"Working with process listing, but got invalid process id {process_id!r}") + processes = [p for p in data["processes"] if p.get("id") == process_id] + if len(processes) != 1: + raise LookupError(f"Process {process_id!r} not found in process listing {namespace!r}") + (data,) = processes + + return Process.from_dict(data) diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 9e3b5e784..8e57212ae 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -990,8 +990,14 @@ def remote_process_definitions(self, requests_mock): json={ "id": "increment", "parameters": [ - {"name": "data", "schema": {"type": "number"}}, - {"name": "increment", "schema": {"type": "number"}, "optional": True, "default": 1}, + {"name": "data", "description": "data", "schema": {"type": "number"}}, + { + "name": "increment", + "description": "increment", + "schema": {"type": "number"}, + "optional": True, + "default": 1, + }, ], "process_graph": { "process_id": "add", diff --git a/tests/internal/processes/test_parse.py b/tests/internal/processes/test_parse.py index db2dbf351..2ed106a61 100644 --- a/tests/internal/processes/test_parse.py +++ b/tests/internal/processes/test_parse.py @@ -1,4 +1,13 @@ -from openeo.internal.processes.parse import Parameter, Process, Returns, Schema +import pytest + +from openeo.internal.processes.parse import ( + _NO_DEFAULT, + Parameter, + Process, + Returns, + Schema, + parse_remote_process_definition, +) def test_schema(): @@ -6,6 +15,13 @@ def test_schema(): assert s.schema == {"type": "number"} +def test_schema_equality(): + assert Schema({"type": "number"}) == Schema({"type": "number"}) + assert Schema({"type": "number"}) == Schema.from_dict({"type": "number"}) + + assert Schema({"type": "number"}) != Schema({"type": "string"}) + + def test_parameter(): p = Parameter.from_dict({ "name": "foo", @@ -15,7 +31,7 @@ def test_parameter(): assert p.name == "foo" assert p.description == "Foo amount" assert p.schema.schema == {"type": "number"} - assert p.default is Parameter.NO_DEFAULT + assert p.default is _NO_DEFAULT assert p.optional is False @@ -39,6 +55,14 @@ def test_parameter_default_none(): assert p.default is None +def test_parameter_equality(): + p1 = Parameter.from_dict({"name": "foo", "description": "Foo", "schema": {"type": "number"}}) + p2 = Parameter.from_dict({"name": "foo", "description": "Foo", "schema": {"type": "number"}}) + p3 = Parameter.from_dict({"name": "foo", "description": "Foo", "schema": {"type": "string"}}) + assert p1 == p2 + assert p1 != p3 + + def test_returns(): r = Returns.from_dict({ "description": "Roo", @@ -98,3 +122,87 @@ def test_process_from_json(): assert p.parameters[0].schema.schema == {"type": ["number", "null"]} assert p.returns.description == "The computed absolute value." assert p.returns.schema.schema == {"type": ["number", "null"], "minimum": 0} + + +def test_parse_remote_process_definition_minimal(requests_mock): + url = "https://example.com/ndvi.json" + requests_mock.get(url, json={"id": "ndvi"}) + process = parse_remote_process_definition(url) + assert process.id == "ndvi" + assert process.parameters is None + assert process.returns is None + assert process.description is None + assert process.summary is None + + +def test_parse_remote_process_definition_parameters(requests_mock): + url = "https://example.com/ndvi.json" + requests_mock.get( + url, + json={ + "id": "ndvi", + "parameters": [ + {"name": "incr", "description": "Increment", "schema": {"type": "number"}}, + {"name": "scales", "description": "Scales", "default": [1, 1], "schema": {"type": "number"}}, + ], + }, + ) + process = parse_remote_process_definition(url) + assert process.id == "ndvi" + assert process.parameters == [ + Parameter(name="incr", description="Increment", schema=Schema({"type": "number"})), + Parameter(name="scales", description="Scales", default=[1, 1], schema=Schema({"type": "number"})), + ] + assert process.returns is None + assert process.description is None + assert process.summary is None + + +def test_parse_remote_process_definition_listing(requests_mock): + url = "https://example.com/processes.json" + requests_mock.get( + url, + json={ + "processes": [ + { + "id": "ndvi", + "parameters": [{"name": "incr", "description": "Incr", "schema": {"type": "number"}}], + }, + { + "id": "scale", + "parameters": [ + {"name": "factor", "description": "Factor", "default": 1, "schema": {"type": "number"}} + ], + }, + ], + "links": [], + }, + ) + + # No process id given + with pytest.raises(ValueError, match="Working with process listing, but got invalid process id None"): + parse_remote_process_definition(url) + + # Process id not found + with pytest.raises(LookupError, match="Process 'mehblargh' not found in process listing"): + parse_remote_process_definition(url, process_id="mehblargh") + + # Valid proces id + process = parse_remote_process_definition(url, process_id="ndvi") + assert process.id == "ndvi" + assert process.parameters == [ + Parameter(name="incr", description="Incr", schema=Schema({"type": "number"})), + ] + assert process.returns is None + assert process.description is None + assert process.summary is None + + # Another proces id + process = parse_remote_process_definition(url, process_id="scale") + assert process.id == "scale" + assert process.parameters == [ + Parameter(name="factor", description="Factor", default=1, schema=Schema({"type": "number"})), + ] + assert process.returns is None + assert process.description is None + assert process.summary is None From 04bc791177526cbeb5494af5e579d7cabe415d14 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Fri, 11 Oct 2024 13:59:46 +0200 Subject: [PATCH 03/22] Issue #604/#644 explicitly use default value from schema as fallback improves long term traceability and observability --- openeo/extra/job_management.py | 10 +++++++--- tests/extra/test_job_management.py | 4 ++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index d10be9ac6..7dd43bfed 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -895,7 +895,7 @@ def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str = class UDPJobFactory: """ Batch job factory based on a parameterized process definition - (e.g a user-defined process (UDP) or a remote process definitions), + (e.g a user-defined process (UDP) or a remote process definition), to be used together with :py:class:`MultiBackendJobManager`. """ @@ -946,9 +946,13 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: elif name in self._parameter_defaults: # Fallback on default values from constructor value = self._parameter_defaults[name] + elif parameter.has_default(): + # Explicitly use default value from parameter schema + value = parameter.default + elif parameter.optional: + # Skip optional parameters without any fallback default value + continue else: - if parameter.optional: - continue raise ValueError(f"Missing required parameter {name!r} for process {self._process_id!r}") # TODO: validation or normalization based on schema? diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 8e57212ae..6a93b26b1 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1041,7 +1041,7 @@ def test_basic(self, con120, dummy_backend): "increment1": { "process_id": "increment", "namespace": "https://remote.test/increment.json", - "arguments": {"data": 123}, + "arguments": {"data": 123, "increment": 1}, "result": True, } }, @@ -1052,7 +1052,7 @@ def test_basic(self, con120, dummy_backend): @pytest.mark.parametrize( ["parameter_defaults", "row", "expected_arguments"], [ - (None, {"data": 123}, {"data": 123}), + (None, {"data": 123}, {"data": 123, "increment": 1}), (None, {"data": 123, "increment": 5}, {"data": 123, "increment": 5}), ({"increment": 5}, {"data": 123}, {"data": 123, "increment": 5}), ({"increment": 5}, {"data": 123, "increment": 1000}, {"data": 123, "increment": 1000}), From ff8b553b954e1ef1c6db0be43f094194ddd3cec3 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Fri, 11 Oct 2024 14:39:02 +0200 Subject: [PATCH 04/22] Issue #604/#644 add UDPJobFactory+MultiBackendJobManager tests --- tests/extra/test_job_management.py | 70 ++++++++++++++++++++++++++++-- 1 file changed, 66 insertions(+), 4 deletions(-) diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 6a93b26b1..de751c218 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -85,13 +85,14 @@ def _handle_cancel_job(self, request, context): context.status_code = 204 +@pytest.fixture +def sleep_mock(): + with mock.patch("time.sleep") as sleep: + yield sleep + class TestMultiBackendJobManager: - @pytest.fixture - def sleep_mock(self): - with mock.patch("time.sleep") as sleep: - yield sleep def test_basic_legacy(self, tmp_path, requests_mock, sleep_mock): """ @@ -1084,4 +1085,65 @@ def test_basic_parameterization(self, con120, dummy_backend, parameter_defaults, } + @pytest.fixture + def job_manager(self, tmp_path, dummy_backend) -> MultiBackendJobManager: + job_manager = MultiBackendJobManager(root_dir=tmp_path / "job_mgr_root") + job_manager.add_backend("dummy", connection=dummy_backend.connection) + return job_manager + def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock): + job_starter = UDPJobFactory( + process_id="increment", + namespace="https://remote.test/increment.json", + parameter_defaults={"increment": 5}, + ) + + df = pd.DataFrame({"data": [1, 2, 3]}) + job_db = CsvJobDatabase(tmp_path / "jobs.csv") + # TODO #636 avoid this cumbersome pattern using private _normalize_df API + job_db.persist(job_manager._normalize_df(df)) + + job_manager.run_jobs(job_db=job_db, start_job=job_starter) + assert sleep_mock.call_count > 0 + + result = job_db.read() + assert set(result.status) == {"finished"} + + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": {"data": 1, "increment": 5}, + "result": True, + } + }, + "status": "finished", + }, + "job-001": { + "job_id": "job-001", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": {"data": 2, "increment": 5}, + "result": True, + } + }, + "status": "finished", + }, + "job-002": { + "job_id": "job-002", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": {"data": 3, "increment": 5}, + "result": True, + } + }, + "status": "finished", + }, + } From 64cafcf28e58531717b55f7978f229e762ca88c1 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Fri, 11 Oct 2024 16:03:51 +0200 Subject: [PATCH 05/22] Issue #645 introduce returning event stats from MultiBackendJobManager.run_jobs --- CHANGELOG.md | 3 +- openeo/extra/job_management.py | 60 +++++++++++++++++++++----- tests/extra/test_job_management.py | 68 ++++++++++++++++++++++++------ 3 files changed, 106 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f8d01fc57..e91aaabfc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,8 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `MultiBackendJobManager`: Added `initialize_from_df(df)` (to `CsvJobDatabase` and `ParquetJobDatabase`) to initialize (and persist) the job database from a given DataFrame. Also added `create_job_db()` factory to easily create a job database from a given dataframe and its type guessed from filename extension. ([#635](https://github.com/Open-EO/openeo-python-client/issues/635)) - - +- `MultiBackendJobManager.run_jobs()` now returns a dictionary with counters/stats about various events during the job run ([#645](https://github.com/Open-EO/openeo-python-client/issues/645)) ### Changed diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 7dd43bfed..bc1b9ff8e 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -1,4 +1,5 @@ import abc +import collections import contextlib import datetime import functools @@ -380,7 +381,7 @@ def run_jobs( start_job: Callable[[], BatchJob] = _start_job_default, job_db: Union[str, Path, JobDatabaseInterface, None] = None, **kwargs, - ): + ) -> dict: """Runs jobs, specified in a dataframe, and tracks parameters. :param df: @@ -422,6 +423,10 @@ def run_jobs( Support for Parquet files depends on the ``pyarrow`` package as :ref:`optional dependency `. + :return: dictionary with stats collected during the job running loop. + Note that the set of fields in this dictionary is experimental + and subject to change + .. versionchanged:: 0.31.0 Added support for persisting the job metadata in Parquet format. @@ -430,6 +435,9 @@ def run_jobs( which can be a path to a CSV or Parquet file, or a user-defined :py:class:`JobDatabaseInterface` object. The deprecated ``output_file`` argument is still supported for now. + + .. versionchanged:: 0.33.0 + return a stats dictionary """ # TODO: Defining start_jobs as a Protocol might make its usage more clear, and avoid complicated doctrings, # but Protocols are only supported in Python 3.8 and higher. @@ -457,23 +465,35 @@ def run_jobs( # TODO: start showing deprecation warnings for this usage pattern? job_db.initialize_from_df(df) + stats = collections.defaultdict(int) while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0: - self._job_update_loop(job_db=job_db, start_job=start_job) + self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats) + stats["run_jobs loop"] += 1 + time.sleep(self.poll_sleep) + stats["sleep"] += 1 - def _job_update_loop(self, job_db: JobDatabaseInterface, start_job: Callable[[], BatchJob]): + return stats + + def _job_update_loop( + self, job_db: JobDatabaseInterface, start_job: Callable[[], BatchJob], stats: Optional[dict] = None + ): """ Inner loop logic of job management: go through the necessary jobs to check for status updates, trigger status events, start new jobs when there is room for them, etc. """ + stats = stats if stats is not None else collections.defaultdict(int) + with ignore_connection_errors(context="get statuses"): - self._track_statuses(job_db) + self._track_statuses(job_db, stats=stats) + stats["track_statuses"] += 1 not_started = job_db.get_by_status(statuses=["not_started"], max=200) if len(not_started) > 0: # Check number of jobs running at each backend running = job_db.get_by_status(statuses=["created", "queued", "running"]) + stats["job_db get_by_status"] += 1 per_backend = running.groupby("backend_name").size().to_dict() _log.info(f"Running per backend: {per_backend}") for backend_name in self.backends: @@ -482,10 +502,13 @@ def _job_update_loop(self, job_db: JobDatabaseInterface, start_job: Callable[[], to_add = self.backends[backend_name].parallel_jobs - backend_load to_launch = not_started.iloc[0:to_add] for i in to_launch.index: - self._launch_job(start_job, not_started, i, backend_name) + self._launch_job(start_job, df=not_started, i=i, backend_name=backend_name, stats=stats) + stats["job launch"] += 1 + job_db.persist(to_launch) + stats["job_db persist"] += 1 - def _launch_job(self, start_job, df, i, backend_name): + def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = None): """Helper method for launching jobs :param start_job: @@ -508,6 +531,7 @@ def _launch_job(self, start_job, df, i, backend_name): :param backend_name: name of the backend that will execute the job. """ + stats = stats if stats is not None else collections.defaultdict(int) df.loc[i, "backend_name"] = backend_name row = df.loc[i] @@ -515,6 +539,7 @@ def _launch_job(self, start_job, df, i, backend_name): _log.info(f"Starting job on backend {backend_name} for {row.to_dict()}") connection = self._get_connection(backend_name, resilient=True) + stats["start_job call"] += 1 job = start_job( row=row, connection_provider=self._get_connection, @@ -524,23 +549,30 @@ def _launch_job(self, start_job, df, i, backend_name): except requests.exceptions.ConnectionError as e: _log.warning(f"Failed to start job for {row.to_dict()}", exc_info=True) df.loc[i, "status"] = "start_failed" + stats["start_job error"] += 1 else: df.loc[i, "start_time"] = rfc3339.utcnow() if job: df.loc[i, "id"] = job.job_id with ignore_connection_errors(context="get status"): status = job.status() + stats["job get status"] += 1 df.loc[i, "status"] = status if status == "created": # start job if not yet done by callback try: job.start() + stats["job start"] += 1 df.loc[i, "status"] = job.status() + stats["job get status"] += 1 except OpenEoApiError as e: _log.error(e) df.loc[i, "status"] = "start_failed" + stats["job start error"] += 1 else: + # TODO: what is this "skipping" about actually? df.loc[i, "status"] = "skipped" + stats["start_job skipped"] += 1 def on_job_done(self, job: BatchJob, row): """ @@ -623,11 +655,13 @@ def ensure_job_dir_exists(self, job_id: str) -> Path: if not job_dir.exists(): job_dir.mkdir(parents=True) - def _track_statuses(self, job_db: JobDatabaseInterface): + def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] = None): """ Tracks status (and stats) of running jobs (in place). Optionally cancels jobs when running too long. """ + stats = stats if stats is not None else collections.defaultdict(int) + active = job_db.get_by_status(statuses=["created", "queued", "running"]) for i in active.index: job_id = active.loc[i, "id"] @@ -638,6 +672,7 @@ def _track_statuses(self, job_db: JobDatabaseInterface): con = self._get_connection(backend_name) the_job = con.job(job_id) job_metadata = the_job.describe() + stats["job describe"] += 1 new_status = job_metadata["status"] _log.info( @@ -645,15 +680,19 @@ def _track_statuses(self, job_db: JobDatabaseInterface): ) if new_status == "finished": + stats["job finished"] += 1 self.on_job_done(the_job, active.loc[i]) if previous_status != "error" and new_status == "error": + stats["job failed"] += 1 self.on_job_error(the_job, active.loc[i]) if previous_status in {"created", "queued"} and new_status == "running": + stats["job started running"] += 1 active.loc[i, "running_start_time"] = rfc3339.utcnow() if new_status == "canceled": + stats["job canceled"] += 1 self.on_job_cancel(the_job, active.loc[i]) if self._cancel_running_job_after and new_status == "running": @@ -667,10 +706,14 @@ def _track_statuses(self, job_db: JobDatabaseInterface): active.loc[i, key] = _format_usage_stat(job_metadata, key) except OpenEoApiError as e: + stats["job tracking error"] += 1 print(f"error for job {job_id!r} on backend {backend_name}") print(e) + + stats["job_db persist"] += 1 job_db.persist(active) + def _format_usage_stat(job_metadata: dict, field: str) -> str: value = deep_get(job_metadata, "usage", field, "value", default=0) unit = deep_get(job_metadata, "usage", field, "unit", default="") @@ -977,6 +1020,3 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: def __call__(self, *arg, **kwargs) -> BatchJob: """Syntactic sugar for calling `start_job` directly.""" return self.start_job(*arg, **kwargs) - - - diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index de751c218..086faf3a3 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -29,9 +29,9 @@ CsvJobDatabase, MultiBackendJobManager, ParquetJobDatabase, + UDPJobFactory, create_job_db, get_job_db, - UDPJobFactory, ) from openeo.rest._testing import OPENEO_BACKEND, DummyBackend, build_capabilities from openeo.util import rfc3339 @@ -113,8 +113,17 @@ def start_job(row, connection, **kwargs): year = int(row["year"]) return BatchJob(job_id=f"job-{year}", connection=connection) - manager.run_jobs(df=df, start_job=start_job, output_file=output_file) - assert sleep_mock.call_count > 10 + run_stats = manager.run_jobs(df=df, start_job=start_job, output_file=output_file) + assert run_stats == dirty_equals.IsPartialDict( + { + "sleep": dirty_equals.IsInt(gt=10), + "start_job call": 7, # TODO? + "job started running": 5, + "job finished": 5, + "job_db persist": dirty_equals.IsInt(gt=5), + "run_jobs loop": dirty_equals.IsInt(gt=5), + } + ) result = pd.read_csv(output_file) assert len(result) == 5 @@ -148,8 +157,17 @@ def start_job(row, connection, **kwargs): job_db = CsvJobDatabase(output_file).initialize_from_df(df) - manager.run_jobs(job_db=job_db, start_job=start_job) - assert sleep_mock.call_count > 10 + run_stats = manager.run_jobs(job_db=job_db, start_job=start_job) + assert run_stats == dirty_equals.IsPartialDict( + { + "sleep": dirty_equals.IsInt(gt=10), + "start_job call": 7, # TODO? + "job started running": 5, + "job finished": 5, + "job_db persist": dirty_equals.IsInt(gt=5), + "run_jobs loop": dirty_equals.IsInt(gt=5), + } + ) result = pd.read_csv(output_file) assert len(result) == 5 @@ -176,8 +194,14 @@ def start_job(row, connection, **kwargs): output_file = tmp_path / "jobs.db" job_db = db_class(output_file).initialize_from_df(df) - manager.run_jobs(job_db=job_db, start_job=start_job) - assert sleep_mock.call_count > 10 + run_stats = manager.run_jobs(job_db=job_db, start_job=start_job) + assert run_stats == dirty_equals.IsPartialDict( + { + "start_job call": 7, # TODO? + "job finished": 5, + "job_db persist": dirty_equals.IsInt(gt=5), + } + ) result = job_db.read() assert len(result) == 5 @@ -205,8 +229,14 @@ def start_job(row, connection, **kwargs): output_file = tmp_path / filename job_db = create_job_db(path=output_file, df=df) - manager.run_jobs(job_db=job_db, start_job=start_job) - assert sleep_mock.call_count > 10 + run_stats = manager.run_jobs(job_db=job_db, start_job=start_job) + assert run_stats == dirty_equals.IsPartialDict( + { + "start_job call": 7, # TODO? + "job finished": 5, + "job_db persist": dirty_equals.IsInt(gt=5), + } + ) result = job_db.read() assert len(result) == 5 @@ -235,6 +265,7 @@ def start_job(row, connection, **kwargs): # Trigger context switch to job thread sleep(1) manager.stop_job_thread() + # TODO #645 how to collect stats with the threaded run_job? assert sleep_mock.call_count > 10 result = pd.read_csv(output_file) @@ -543,8 +574,12 @@ def start_job(row, connection_provider, connection, **kwargs): output_file = tmp_path / "jobs.csv" - manager.run_jobs(df=df, start_job=start_job, output_file=output_file) - assert sleep_mock.call_count > 3 + run_stats = manager.run_jobs(df=df, start_job=start_job, output_file=output_file) + assert run_stats == dirty_equals.IsPartialDict( + { + "start_job call": 1, + } + ) # Sanity check: the job succeeded result = pd.read_csv(output_file) @@ -615,6 +650,7 @@ def start_job(row, connection_provider, connection, **kwargs): with pytest.raises(requests.exceptions.RetryError) as exc: manager.run_jobs(df=df, start_job=start_job, output_file=output_file) + # TODO #645 how to still check stats when run_jobs raised exception? assert sleep_mock.call_count > 3 # Sanity check: the job has status "error" @@ -1103,8 +1139,14 @@ def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job # TODO #636 avoid this cumbersome pattern using private _normalize_df API job_db.persist(job_manager._normalize_df(df)) - job_manager.run_jobs(job_db=job_db, start_job=job_starter) - assert sleep_mock.call_count > 0 + stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) + assert stats == dirty_equals.IsPartialDict( + { + "sleep": dirty_equals.IsInt(gt=1), + "start_job call": 3, + "job start": 3, + } + ) result = job_db.read() assert set(result.status) == {"finished"} From 04296c1a5822971cd46066be1cfcdd156282739c Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Fri, 11 Oct 2024 16:18:05 +0200 Subject: [PATCH 06/22] Issue #604/#644 more UDPJobFactory+MultiBackendJobManager tests --- tests/extra/test_job_management.py | 109 +++++++++++++++++++++++++++-- 1 file changed, 103 insertions(+), 6 deletions(-) diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 086faf3a3..e3d8aba66 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1135,9 +1135,7 @@ def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job ) df = pd.DataFrame({"data": [1, 2, 3]}) - job_db = CsvJobDatabase(tmp_path / "jobs.csv") - # TODO #636 avoid this cumbersome pattern using private _normalize_df API - job_db.persist(job_manager._normalize_df(df)) + job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df) stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) assert stats == dirty_equals.IsPartialDict( @@ -1147,9 +1145,7 @@ def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job "job start": 3, } ) - - result = job_db.read() - assert set(result.status) == {"finished"} + assert set(job_db.read().status) == {"finished"} assert dummy_backend.batch_jobs == { "job-000": { @@ -1189,3 +1185,104 @@ def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job "status": "finished", }, } + + @pytest.mark.parametrize( + ["parameter_defaults", "df_data", "expected_arguments"], + [ + ( + {"increment": 5}, + {"data": [1, 2, 3]}, + { + "job-000": {"data": 1, "increment": 5}, + "job-001": {"data": 2, "increment": 5}, + "job-002": {"data": 3, "increment": 5}, + }, + ), + ( + None, + {"data": [1, 2, 3], "increment": [44, 55, 66]}, + { + "job-000": {"data": 1, "increment": 44}, + "job-001": {"data": 2, "increment": 55}, + "job-002": {"data": 3, "increment": 66}, + }, + ), + ( + {"increment": 5555}, + {"data": [1, 2, 3], "increment": [44, 55, 66]}, + { + "job-000": {"data": 1, "increment": 44}, + "job-001": {"data": 2, "increment": 55}, + "job-002": {"data": 3, "increment": 66}, + }, + ), + ], + ) + def test_udp_job_manager_parameter_handling( + self, + tmp_path, + requests_mock, + dummy_backend, + job_manager, + sleep_mock, + parameter_defaults, + df_data, + expected_arguments, + ): + job_starter = UDPJobFactory( + process_id="increment", + namespace="https://remote.test/increment.json", + parameter_defaults=parameter_defaults, + ) + + df = pd.DataFrame(df_data) + job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df) + + stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) + assert stats == dirty_equals.IsPartialDict( + { + "sleep": dirty_equals.IsInt(gt=1), + "start_job call": 3, + "job start": 3, + } + ) + assert set(job_db.read().status) == {"finished"} + + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": expected_arguments["job-000"], + "result": True, + } + }, + "status": "finished", + }, + "job-001": { + "job_id": "job-001", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": expected_arguments["job-001"], + "result": True, + } + }, + "status": "finished", + }, + "job-002": { + "job_id": "job-002", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": expected_arguments["job-002"], + "result": True, + } + }, + "status": "finished", + }, + } From f8db8771bbda25f031c6772d0930efdebff86d86 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Fri, 11 Oct 2024 20:11:23 +0200 Subject: [PATCH 07/22] Issue #604/#644 UDPJobFactory: improve geometry support --- openeo/extra/job_management.py | 65 ++++++++++++--- openeo/internal/processes/parse.py | 12 +++ openeo/rest/_testing.py | 36 +++++++- tests/extra/test_job_management.py | 109 ++++++++++++++++++++++++- tests/internal/processes/test_parse.py | 14 ++++ tests/rest/test_testing.py | 64 +++++++++++++++ 6 files changed, 282 insertions(+), 18 deletions(-) create mode 100644 tests/rest/test_testing.py diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index bc1b9ff8e..0697201ab 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -16,11 +16,16 @@ import pandas as pd import requests import shapely.errors +import shapely.geometry.base import shapely.wkt from requests.adapters import HTTPAdapter, Retry from openeo import BatchJob, Connection -from openeo.internal.processes.parse import Process, parse_remote_process_definition +from openeo.internal.processes.parse import ( + Parameter, + Process, + parse_remote_process_definition, +) from openeo.rest import OpenEoApiError from openeo.util import deep_get, repr_truncate, rfc3339 @@ -943,11 +948,17 @@ class UDPJobFactory: """ def __init__( - self, process_id: str, *, namespace: Union[str, None] = None, parameter_defaults: Optional[dict] = None + self, + process_id: str, + *, + namespace: Union[str, None] = None, + parameter_defaults: Optional[dict] = None, + parameter_column_map: Optional[dict] = None, ): self._process_id = process_id self._namespace = namespace self._parameter_defaults = parameter_defaults or {} + self._parameter_column_map = parameter_column_map def _get_process_definition(self, connection: Connection) -> Process: if isinstance(self._namespace, str) and re.match("https?://", self._namespace): @@ -979,16 +990,20 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: process_definition = self._get_process_definition(connection=connection) parameters = process_definition.parameters or [] + + if self._parameter_column_map is None: + self._parameter_column_map = self._guess_parameter_column_map(parameters=parameters, row=row) + arguments = {} for parameter in parameters: - name = parameter.name - schema = parameter.schema - if name in row.index: - # Higherst priority: value from dataframe row - value = row[name] - elif name in self._parameter_defaults: + param_name = parameter.name + column_name = self._parameter_column_map.get(param_name, param_name) + if column_name in row.index: + # Get value from dataframe row + value = row.loc[column_name] + elif param_name in self._parameter_defaults: # Fallback on default values from constructor - value = self._parameter_defaults[name] + value = self._parameter_defaults[param_name] elif parameter.has_default(): # Explicitly use default value from parameter schema value = parameter.default @@ -996,16 +1011,17 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: # Skip optional parameters without any fallback default value continue else: - raise ValueError(f"Missing required parameter {name!r} for process {self._process_id!r}") + raise ValueError(f"Missing required parameter {param_name !r} for process {self._process_id!r}") - # TODO: validation or normalization based on schema? - # Some pandas/numpy data types need a bit of conversion for JSON encoding + # Prepare some values/dtypes for JSON encoding if isinstance(value, numpy.integer): value = int(value) elif isinstance(value, numpy.number): value = float(value) + elif isinstance(value, shapely.geometry.base.BaseGeometry): + value = shapely.geometry.mapping(value) - arguments[name] = value + arguments[param_name] = value cube = connection.datacube_from_process(process_id=self._process_id, namespace=self._namespace, **arguments) @@ -1020,3 +1036,26 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: def __call__(self, *arg, **kwargs) -> BatchJob: """Syntactic sugar for calling `start_job` directly.""" return self.start_job(*arg, **kwargs) + + @staticmethod + def _guess_parameter_column_map(parameters: List[Parameter], row: pd.Series) -> dict: + """ + Guess parameter-column mapping from given parameter list and dataframe row + """ + parameter_column_map = {} + # Geometry based mapping: try to automatically map geometry columns to geojson parameters + geojson_parameters = [p.name for p in parameters if p.schema.accepts_geojson()] + geometry_columns = [i for (i, v) in row.items() if isinstance(v, shapely.geometry.base.BaseGeometry)] + if geojson_parameters and geometry_columns: + if len(geojson_parameters) == 1 and len(geometry_columns) == 1: + # Most common case: one geometry parameter and one geometry column: can be mapped naively + parameter_column_map[geojson_parameters[0]] = geometry_columns[0] + elif all(p in geometry_columns for p in geojson_parameters): + # Each geometry param has geometry column with same name: easy to map + parameter_column_map.update((p, p) for p in geojson_parameters) + else: + raise RuntimeError( + f"Problem with mapping geometry columns ({geometry_columns}) to process parameters ({geojson_parameters})" + ) + _log.debug(f"Guessed parameter-column map: {parameter_column_map}") + return parameter_column_map diff --git a/openeo/internal/processes/parse.py b/openeo/internal/processes/parse.py index 2c3942f39..0d62fe4cf 100644 --- a/openeo/internal/processes/parse.py +++ b/openeo/internal/processes/parse.py @@ -31,6 +31,18 @@ def is_process_graph(self) -> bool: and self.schema.get("subtype") == "process-graph" ) + def accepts_geojson(self) -> bool: + """Does this schema accept inline GeoJSON objects?""" + + def is_geojson_schema(schema) -> bool: + return isinstance(schema, dict) and schema.get("type") == "object" and schema.get("subtype") == "geojson" + + if isinstance(self.schema, dict): + return is_geojson_schema(self.schema) + elif isinstance(self.schema, list): + return any(is_geojson_schema(s) for s in self.schema) + return False + _NO_DEFAULT = object() diff --git a/openeo/rest/_testing.py b/openeo/rest/_testing.py index 6764a984a..89f9e30f9 100644 --- a/openeo/rest/_testing.py +++ b/openeo/rest/_testing.py @@ -1,6 +1,7 @@ +import collections import json import re -from typing import Optional, Union +from typing import Callable, Iterator, Optional, Sequence, Union from openeo import Connection, DataCube from openeo.rest.vectorcube import VectorCube @@ -25,6 +26,7 @@ class DummyBackend: "validation_requests", "next_result", "next_validation_errors", + "job_status_updater", ) # Default result (can serve both as JSON or binary data) @@ -37,6 +39,13 @@ def __init__(self, requests_mock, connection: Connection): self.validation_requests = [] self.next_result = self.DEFAULT_RESULT self.next_validation_errors = [] + + # Job status update hook: + # callable that is called on starting a job, and getting job metadata + # allows to dynamically change how the status of a job evolves + # By default: immediately set to "finished" once job is started + self.job_status_updater = lambda job_id, current_status: "finished" + requests_mock.post( connection.build_url("/result"), content=self._handle_post_result, @@ -90,13 +99,19 @@ def _handle_post_job_results(self, request, context): """Handler of `POST /job/{job_id}/results` (start batch job).""" job_id = self._get_job_id(request) assert self.batch_jobs[job_id]["status"] == "created" - # TODO: support custom status sequence (instead of directly going to status "finished")? - self.batch_jobs[job_id]["status"] = "finished" + self.batch_jobs[job_id]["status"] = self.job_status_updater( + job_id=job_id, current_status=self.batch_jobs[job_id]["status"] + ) context.status_code = 202 def _handle_get_job(self, request, context): """Handler of `GET /job/{job_id}` (get batch job status and metadata).""" job_id = self._get_job_id(request) + # Allow updating status with `job_status_setter` once job got past status "created" + if self.batch_jobs[job_id]["status"] != "created": + self.batch_jobs[job_id]["status"] = self.job_status_updater( + job_id=job_id, current_status=self.batch_jobs[job_id]["status"] + ) return {"id": job_id, "status": self.batch_jobs[job_id]["status"]} def _handle_get_job_results(self, request, context): @@ -162,6 +177,21 @@ def execute(self, cube: Union[DataCube, VectorCube], process_id: Optional[str] = cube.execute() return self.get_pg(process_id=process_id) + def setup_simple_job_status_flow(self, *, queued: int = 1, running: int = 4, final: str = "finished"): + """ + Set up simple job status flow: + queued (a couple of times) -> running (a couple of times) -> finished/error. + """ + template = ["queued"] * queued + ["running"] * running + [final] + job_stacks = collections.defaultdict(template.copy) + + def get_status(job_id: str, current_status: str) -> str: + stack = job_stacks[job_id] + # Pop first item each time, but repeat the last one at the end + return stack.pop(0) if len(stack) > 1 else stack[0] + + self.job_status_updater = get_status + def build_capabilities( *, diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index e3d8aba66..c41663b39 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1011,7 +1011,9 @@ def test_create_job_db(tmp_path, filename, expected): class TestUDPJobFactory: @pytest.fixture def dummy_backend(self, requests_mock, con120) -> DummyBackend: - return DummyBackend(requests_mock=requests_mock, connection=con120) + dummy = DummyBackend(requests_mock=requests_mock, connection=con120) + dummy.setup_simple_job_status_flow(queued=2, running=3, final="finished") + return dummy @pytest.fixture(autouse=True) def remote_process_definitions(self, requests_mock): @@ -1043,6 +1045,31 @@ def remote_process_definitions(self, requests_mock): }, }, ) + requests_mock.get( + "https://remote.test/offset_poplygon.json", + json={ + "id": "offset_poplygon", + "parameters": [ + {"name": "data", "description": "data", "schema": {"type": "number"}}, + { + "name": "polygons", + "description": "polygons", + "schema": { + "title": "GeoJSON", + "type": "object", + "subtype": "geojson", + }, + }, + { + "name": "offset", + "description": "Offset", + "schema": {"type": "number"}, + "optional": True, + "default": 0, + }, + ], + }, + ) def test_minimal(self, con120, dummy_backend): """Bare minimum: just start a job, no parameters/arguments""" @@ -1124,7 +1151,7 @@ def test_basic_parameterization(self, con120, dummy_backend, parameter_defaults, @pytest.fixture def job_manager(self, tmp_path, dummy_backend) -> MultiBackendJobManager: job_manager = MultiBackendJobManager(root_dir=tmp_path / "job_mgr_root") - job_manager.add_backend("dummy", connection=dummy_backend.connection) + job_manager.add_backend("dummy", connection=dummy_backend.connection, parallel_jobs=1) return job_manager def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock): @@ -1143,6 +1170,8 @@ def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job "sleep": dirty_equals.IsInt(gt=1), "start_job call": 3, "job start": 3, + "job started running": 3, + "job finished": 3, } ) assert set(job_db.read().status) == {"finished"} @@ -1244,6 +1273,7 @@ def test_udp_job_manager_parameter_handling( "sleep": dirty_equals.IsInt(gt=1), "start_job call": 3, "job start": 3, + "job finished": 3, } ) assert set(job_db.read().status) == {"finished"} @@ -1286,3 +1316,78 @@ def test_udp_job_manager_parameter_handling( "status": "finished", }, } + + def test_udp_job_manager_geometry(self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock): + job_starter = UDPJobFactory( + process_id="offset_poplygon", + namespace="https://remote.test/offset_poplygon.json", + parameter_defaults={"data": 123}, + ) + + df = geopandas.GeoDataFrame.from_features( + { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "id": "one", + "properties": {"offset": 11}, + "geometry": {"type": "Point", "coordinates": (1.0, 2.0)}, + }, + { + "type": "Feature", + "id": "two", + "properties": {"offset": 22}, + "geometry": {"type": "Point", "coordinates": (3.0, 4.0)}, + }, + ], + } + ) + + job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df) + + stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) + assert stats == dirty_equals.IsPartialDict( + { + "sleep": dirty_equals.IsInt(gt=1), + "start_job call": 2, + "job start": 2, + "job finished": 2, + } + ) + assert set(job_db.read().status) == {"finished"} + + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "offsetpoplygon1": { + "process_id": "offset_poplygon", + "namespace": "https://remote.test/offset_poplygon.json", + "arguments": { + "data": 123, + "polygons": {"type": "Point", "coordinates": [1.0, 2.0]}, + "offset": 11, + }, + "result": True, + } + }, + "status": "finished", + }, + "job-001": { + "job_id": "job-001", + "pg": { + "offsetpoplygon1": { + "process_id": "offset_poplygon", + "namespace": "https://remote.test/offset_poplygon.json", + "arguments": { + "data": 123, + "polygons": {"type": "Point", "coordinates": [3.0, 4.0]}, + "offset": 22, + }, + "result": True, + } + }, + "status": "finished", + }, + } diff --git a/tests/internal/processes/test_parse.py b/tests/internal/processes/test_parse.py index 2ed106a61..ac8669d96 100644 --- a/tests/internal/processes/test_parse.py +++ b/tests/internal/processes/test_parse.py @@ -22,6 +22,20 @@ def test_schema_equality(): assert Schema({"type": "number"}) != Schema({"type": "string"}) +@pytest.mark.parametrize( + ["schema", "expected"], + [ + ({"type": "object", "subtype": "geojson"}, True), + ({"type": "object"}, False), + ({"subtype": "geojson"}, False), + ({"type": "object", "subtype": "vectorzz"}, False), + ], +) +def test_schema_accepts_geojson(schema, expected): + assert Schema(schema).accepts_geojson() == expected + assert Schema([{"type": "number"}, schema]).accepts_geojson() == expected + + def test_parameter(): p = Parameter.from_dict({ "name": "foo", diff --git a/tests/rest/test_testing.py b/tests/rest/test_testing.py new file mode 100644 index 000000000..8feb63aa0 --- /dev/null +++ b/tests/rest/test_testing.py @@ -0,0 +1,64 @@ +import pytest + +from openeo.rest._testing import DummyBackend + + +@pytest.fixture +def dummy_backend(requests_mock, con120): + return DummyBackend(requests_mock=requests_mock, connection=con120) + + +DUMMY_PG_ADD35 = { + "add35": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}, +} + + +class TestDummyBackend: + def test_create_job(self, dummy_backend, con120): + assert dummy_backend.batch_jobs == {} + _ = con120.create_job(DUMMY_PG_ADD35) + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": {"add35": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}}, + "status": "created", + } + } + + def test_start_job(self, dummy_backend, con120): + job = con120.create_job(DUMMY_PG_ADD35) + assert dummy_backend.batch_jobs == { + "job-000": {"job_id": "job-000", "pg": DUMMY_PG_ADD35, "status": "created"}, + } + job.start() + assert dummy_backend.batch_jobs == { + "job-000": {"job_id": "job-000", "pg": DUMMY_PG_ADD35, "status": "finished"}, + } + + def test_job_status_updater_error(self, dummy_backend, con120): + dummy_backend.job_status_updater = lambda job_id, current_status: "error" + + job = con120.create_job(DUMMY_PG_ADD35) + assert dummy_backend.batch_jobs["job-000"]["status"] == "created" + job.start() + assert dummy_backend.batch_jobs["job-000"]["status"] == "error" + + @pytest.mark.parametrize("final", ["finished", "error"]) + def test_setup_simple_job_status_flow(self, dummy_backend, con120, final): + dummy_backend.setup_simple_job_status_flow(queued=2, running=3, final=final) + job = con120.create_job(DUMMY_PG_ADD35) + assert dummy_backend.batch_jobs["job-000"]["status"] == "created" + + # Note that first status update (to queued here) is triggered from `start()`, not `status()` like below + job.start() + assert dummy_backend.batch_jobs["job-000"]["status"] == "queued" + + # Now go through rest of status flow, through `status()` calls + assert job.status() == "queued" + assert job.status() == "running" + assert job.status() == "running" + assert job.status() == "running" + assert job.status() == final + assert job.status() == final + assert job.status() == final + assert job.status() == final From ff9c3f2128c868bdf11661a9a52967d90ced63b2 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Fri, 11 Oct 2024 20:49:14 +0200 Subject: [PATCH 08/22] Issue #604/#644 test coverage for personal UDP mode and some related refactoring --- tests/extra/test_job_management.py | 202 +++++++++++++++++++---------- 1 file changed, 130 insertions(+), 72 deletions(-) diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index c41663b39..dad0ca069 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1,3 +1,4 @@ +import copy import json import re import threading @@ -38,8 +39,8 @@ @pytest.fixture -def con120(requests_mock) -> openeo.Connection: - requests_mock.get(OPENEO_BACKEND, json=build_capabilities(api_version="1.2.0")) +def con(requests_mock) -> openeo.Connection: + requests_mock.get(OPENEO_BACKEND, json=build_capabilities(api_version="1.2.0", udp=True)) con = openeo.Connection(OPENEO_BACKEND) return con @@ -1010,72 +1011,69 @@ def test_create_job_db(tmp_path, filename, expected): class TestUDPJobFactory: @pytest.fixture - def dummy_backend(self, requests_mock, con120) -> DummyBackend: - dummy = DummyBackend(requests_mock=requests_mock, connection=con120) + def dummy_backend(self, requests_mock, con) -> DummyBackend: + dummy = DummyBackend(requests_mock=requests_mock, connection=con) dummy.setup_simple_job_status_flow(queued=2, running=3, final="finished") return dummy - @pytest.fixture(autouse=True) - def remote_process_definitions(self, requests_mock): - requests_mock.get( - "https://remote.test/3plus5.json", - json={ - "id": "3plus5", - "process_graph": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}, + PG_3PLUS5 = { + "id": "3plus5", + "process_graph": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}, + } + PG_INCREMENT = { + "id": "increment", + "parameters": [ + {"name": "data", "description": "data", "schema": {"type": "number"}}, + { + "name": "increment", + "description": "increment", + "schema": {"type": "number"}, + "optional": True, + "default": 1, }, - ) - requests_mock.get( - "https://remote.test/increment.json", - json={ - "id": "increment", - "parameters": [ - {"name": "data", "description": "data", "schema": {"type": "number"}}, - { - "name": "increment", - "description": "increment", - "schema": {"type": "number"}, - "optional": True, - "default": 1, - }, - ], - "process_graph": { - "process_id": "add", - "arguments": {"x": {"from_parameter": "data"}, "y": {"from_parameter": "increment"}}, - "result": True, + ], + "process_graph": { + "process_id": "add", + "arguments": {"x": {"from_parameter": "data"}, "y": {"from_parameter": "increment"}}, + "result": True, + }, + } + PG_OFFSET_POLYGON = { + "id": "offset_polygon", + "parameters": [ + {"name": "data", "description": "data", "schema": {"type": "number"}}, + { + "name": "polygons", + "description": "polygons", + "schema": { + "title": "GeoJSON", + "type": "object", + "subtype": "geojson", }, }, - ) - requests_mock.get( - "https://remote.test/offset_poplygon.json", - json={ - "id": "offset_poplygon", - "parameters": [ - {"name": "data", "description": "data", "schema": {"type": "number"}}, - { - "name": "polygons", - "description": "polygons", - "schema": { - "title": "GeoJSON", - "type": "object", - "subtype": "geojson", - }, - }, - { - "name": "offset", - "description": "Offset", - "schema": {"type": "number"}, - "optional": True, - "default": 0, - }, - ], + { + "name": "offset", + "description": "Offset", + "schema": {"type": "number"}, + "optional": True, + "default": 0, }, - ) + ], + } - def test_minimal(self, con120, dummy_backend): + @pytest.fixture(autouse=True) + def remote_process_definitions(self, requests_mock) -> dict: + mocks = {} + for pg in [self.PG_3PLUS5, self.PG_INCREMENT, self.PG_OFFSET_POLYGON]: + process_id = pg["id"] + mocks[process_id] = requests_mock.get(f"https://remote.test/{process_id}.json", json=pg) + return mocks + + def test_minimal(self, con, dummy_backend, remote_process_definitions): """Bare minimum: just start a job, no parameters/arguments""" job_factory = UDPJobFactory(process_id="3plus5", namespace="https://remote.test/3plus5.json") - job = job_factory.start_job(row=pd.Series({"foo": 123}), connection=con120) + job = job_factory.start_job(row=pd.Series({"foo": 123}), connection=con) assert isinstance(job, BatchJob) assert dummy_backend.batch_jobs == { "job-000": { @@ -1092,11 +1090,13 @@ def test_minimal(self, con120, dummy_backend): } } - def test_basic(self, con120, dummy_backend): + assert remote_process_definitions["3plus5"].call_count == 1 + + def test_basic(self, con, dummy_backend, remote_process_definitions): """Basic parameterized UDP job generation""" job_factory = UDPJobFactory(process_id="increment", namespace="https://remote.test/increment.json") - job = job_factory.start_job(row=pd.Series({"data": 123}), connection=con120) + job = job_factory.start_job(row=pd.Series({"data": 123}), connection=con) assert isinstance(job, BatchJob) assert dummy_backend.batch_jobs == { "job-000": { @@ -1112,6 +1112,7 @@ def test_basic(self, con120, dummy_backend): "status": "created", } } + assert remote_process_definitions["increment"].call_count == 1 @pytest.mark.parametrize( ["parameter_defaults", "row", "expected_arguments"], @@ -1122,7 +1123,7 @@ def test_basic(self, con120, dummy_backend): ({"increment": 5}, {"data": 123, "increment": 1000}, {"data": 123, "increment": 1000}), ], ) - def test_basic_parameterization(self, con120, dummy_backend, parameter_defaults, row, expected_arguments): + def test_basic_parameterization(self, con, dummy_backend, parameter_defaults, row, expected_arguments): """Basic parameterized UDP job generation""" job_factory = UDPJobFactory( process_id="increment", @@ -1130,7 +1131,7 @@ def test_basic_parameterization(self, con120, dummy_backend, parameter_defaults, parameter_defaults=parameter_defaults, ) - job = job_factory.start_job(row=pd.Series(row), connection=con120) + job = job_factory.start_job(row=pd.Series(row), connection=con) assert isinstance(job, BatchJob) assert dummy_backend.batch_jobs == { "job-000": { @@ -1154,7 +1155,9 @@ def job_manager(self, tmp_path, dummy_backend) -> MultiBackendJobManager: job_manager.add_backend("dummy", connection=dummy_backend.connection, parallel_jobs=1) return job_manager - def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock): + def test_with_job_manager_remote_basic( + self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock, remote_process_definitions + ): job_starter = UDPJobFactory( process_id="increment", namespace="https://remote.test/increment.json", @@ -1175,6 +1178,7 @@ def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job } ) assert set(job_db.read().status) == {"finished"} + assert remote_process_definitions["increment"].call_count == 1 assert dummy_backend.batch_jobs == { "job-000": { @@ -1247,7 +1251,7 @@ def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job ), ], ) - def test_udp_job_manager_parameter_handling( + def test_with_job_manager_remote_parameter_handling( self, tmp_path, requests_mock, @@ -1317,10 +1321,10 @@ def test_udp_job_manager_parameter_handling( }, } - def test_udp_job_manager_geometry(self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock): + def test_with_job_manager_remote_geometry(self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock): job_starter = UDPJobFactory( - process_id="offset_poplygon", - namespace="https://remote.test/offset_poplygon.json", + process_id="offset_polygon", + namespace="https://remote.test/offset_polygon.json", parameter_defaults={"data": 123}, ) @@ -1361,9 +1365,9 @@ def test_udp_job_manager_geometry(self, tmp_path, requests_mock, dummy_backend, "job-000": { "job_id": "job-000", "pg": { - "offsetpoplygon1": { - "process_id": "offset_poplygon", - "namespace": "https://remote.test/offset_poplygon.json", + "offsetpolygon1": { + "process_id": "offset_polygon", + "namespace": "https://remote.test/offset_polygon.json", "arguments": { "data": 123, "polygons": {"type": "Point", "coordinates": [1.0, 2.0]}, @@ -1377,9 +1381,9 @@ def test_udp_job_manager_geometry(self, tmp_path, requests_mock, dummy_backend, "job-001": { "job_id": "job-001", "pg": { - "offsetpoplygon1": { - "process_id": "offset_poplygon", - "namespace": "https://remote.test/offset_poplygon.json", + "offsetpolygon1": { + "process_id": "offset_polygon", + "namespace": "https://remote.test/offset_polygon.json", "arguments": { "data": 123, "polygons": {"type": "Point", "coordinates": [3.0, 4.0]}, @@ -1391,3 +1395,57 @@ def test_udp_job_manager_geometry(self, tmp_path, requests_mock, dummy_backend, "status": "finished", }, } + + def test_with_job_manager_udp_basic( + self, tmp_path, requests_mock, con, dummy_backend, job_manager, sleep_mock, remote_process_definitions + ): + # make deep copy + udp = copy.deepcopy(self.PG_INCREMENT) + # Register personal UDP + increment_udp_mock = requests_mock.get(con.build_url("/process_graphs/increment"), json=udp) + + job_starter = UDPJobFactory( + process_id="increment", + # No namespace to trigger personal UDP mode + namespace=None, + parameter_defaults={"increment": 5}, + ) + assert increment_udp_mock.call_count == 0 + + df = pd.DataFrame({"data": [3, 5]}) + job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df) + + stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) + assert stats == dirty_equals.IsPartialDict( + { + "start_job call": 2, + "job finished": 2, + } + ) + assert increment_udp_mock.call_count == 2 + assert set(job_db.read().status) == {"finished"} + + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "increment1": { + "process_id": "increment", + "arguments": {"data": 3, "increment": 5}, + "result": True, + } + }, + "status": "finished", + }, + "job-001": { + "job_id": "job-001", + "pg": { + "increment1": { + "process_id": "increment", + "arguments": {"data": 5, "increment": 5}, + "result": True, + } + }, + "status": "finished", + }, + } From 632e23914938257940d5548e74edb7acd5563df6 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Fri, 11 Oct 2024 20:58:18 +0200 Subject: [PATCH 09/22] Issue #604/#644 test coverage for geometry handling after resume --- tests/extra/test_job_management.py | 84 ++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index dad0ca069..b4e05c365 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1396,6 +1396,90 @@ def test_with_job_manager_remote_geometry(self, tmp_path, requests_mock, dummy_b }, } + def test_with_job_manager_remote_geometry_with_csv_persistence( + self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock + ): + """Test if geometry handling works properly after resuming from CSV serialized job db.""" + job_starter = UDPJobFactory( + process_id="offset_polygon", + namespace="https://remote.test/offset_polygon.json", + parameter_defaults={"data": 123}, + ) + + df = geopandas.GeoDataFrame.from_features( + { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "id": "one", + "properties": {"offset": 11}, + "geometry": {"type": "Point", "coordinates": (1.0, 2.0)}, + }, + { + "type": "Feature", + "id": "two", + "properties": {"offset": 22}, + "geometry": {"type": "Point", "coordinates": (3.0, 4.0)}, + }, + ], + } + ) + + # Persist the GeoDataFrame to CSV + job_db_path = tmp_path / "jobs.csv" + _ = CsvJobDatabase(job_db_path).initialize_from_df(df) + assert job_db_path.exists() + + # Resume from persisted CSV + job_db = CsvJobDatabase(job_db_path) + + stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) + assert stats == dirty_equals.IsPartialDict( + { + "sleep": dirty_equals.IsInt(gt=1), + "start_job call": 2, + "job start": 2, + "job finished": 2, + } + ) + assert set(job_db.read().status) == {"finished"} + + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "offsetpolygon1": { + "process_id": "offset_polygon", + "namespace": "https://remote.test/offset_polygon.json", + "arguments": { + "data": 123, + "polygons": {"type": "Point", "coordinates": [1.0, 2.0]}, + "offset": 11, + }, + "result": True, + } + }, + "status": "finished", + }, + "job-001": { + "job_id": "job-001", + "pg": { + "offsetpolygon1": { + "process_id": "offset_polygon", + "namespace": "https://remote.test/offset_polygon.json", + "arguments": { + "data": 123, + "polygons": {"type": "Point", "coordinates": [3.0, 4.0]}, + "offset": 22, + }, + "result": True, + } + }, + "status": "finished", + }, + } + def test_with_job_manager_udp_basic( self, tmp_path, requests_mock, con, dummy_backend, job_manager, sleep_mock, remote_process_definitions ): From 5b1e8fa5e372b1b5954a7c83e9a2afa8053ad968 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Fri, 11 Oct 2024 23:00:52 +0200 Subject: [PATCH 10/22] MultiBackendJobManager: fix another SettingWithCopyWarning related bug seems like pandas is handy to build foot guns maybe related to #641 --- openeo/extra/job_management.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 0697201ab..d9339cbab 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -505,12 +505,11 @@ def _job_update_loop( backend_load = per_backend.get(backend_name, 0) if backend_load < self.backends[backend_name].parallel_jobs: to_add = self.backends[backend_name].parallel_jobs - backend_load - to_launch = not_started.iloc[0:to_add] - for i in to_launch.index: + for i in not_started.index[0:to_add]: self._launch_job(start_job, df=not_started, i=i, backend_name=backend_name, stats=stats) stats["job launch"] += 1 - job_db.persist(to_launch) + job_db.persist(not_started.loc[i : i + 1]) stats["job_db persist"] += 1 def _launch_job(self, start_job, df, i, backend_name, stats: Optional[dict] = None): From fd9fdb8525003025bd42bf7cc24406c739233bc6 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Fri, 11 Oct 2024 23:06:05 +0200 Subject: [PATCH 11/22] Issue #604/#644 test coverage for resuming: also parquet --- tests/extra/test_job_management.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index b4e05c365..b2f3bd5bb 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1396,8 +1396,15 @@ def test_with_job_manager_remote_geometry(self, tmp_path, requests_mock, dummy_b }, } - def test_with_job_manager_remote_geometry_with_csv_persistence( - self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock + @pytest.mark.parametrize( + ["db_class"], + [ + (CsvJobDatabase,), + (ParquetJobDatabase,), + ], + ) + def test_with_job_manager_remote_geometry_after_resume( + self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock, db_class ): """Test if geometry handling works properly after resuming from CSV serialized job db.""" job_starter = UDPJobFactory( @@ -1426,13 +1433,13 @@ def test_with_job_manager_remote_geometry_with_csv_persistence( } ) - # Persist the GeoDataFrame to CSV - job_db_path = tmp_path / "jobs.csv" - _ = CsvJobDatabase(job_db_path).initialize_from_df(df) + # Persist the job db to CSV/Parquet/... + job_db_path = tmp_path / "jobs.db" + _ = db_class(job_db_path).initialize_from_df(df) assert job_db_path.exists() - # Resume from persisted CSV - job_db = CsvJobDatabase(job_db_path) + # Resume from persisted job db + job_db = db_class(job_db_path) stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) assert stats == dirty_equals.IsPartialDict( From ade5258ef24ab538c8987e73720a5556dec4079a Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Fri, 11 Oct 2024 23:23:16 +0200 Subject: [PATCH 12/22] Issue #604/#644 fix title/description --- openeo/extra/job_management.py | 4 ++-- openeo/rest/_testing.py | 10 ++++++++-- tests/extra/test_job_management.py | 3 +++ 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index d9339cbab..4bfdceefe 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -1024,9 +1024,9 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: cube = connection.datacube_from_process(process_id=self._process_id, namespace=self._namespace, **arguments) - title = row.get("title", f"UDP {self._process_id!r} with {repr_truncate(parameters)}") + title = row.get("title", f"Process {self._process_id!r} with {repr_truncate(arguments)}") description = row.get( - "description", f"UDP {self._process_id!r} (namespace {self._namespace}) with {parameters}" + "description", f"Process {self._process_id!r} (namespace {self._namespace}) with {arguments}" ) job = connection.create_job(cube, title=title, description=description) diff --git a/openeo/rest/_testing.py b/openeo/rest/_testing.py index 89f9e30f9..2f8209dc2 100644 --- a/openeo/rest/_testing.py +++ b/openeo/rest/_testing.py @@ -27,6 +27,7 @@ class DummyBackend: "next_result", "next_validation_errors", "job_status_updater", + "extra_job_metadata_fields", ) # Default result (can serve both as JSON or binary data) @@ -39,6 +40,7 @@ def __init__(self, requests_mock, connection: Connection): self.validation_requests = [] self.next_result = self.DEFAULT_RESULT self.next_validation_errors = [] + self.extra_job_metadata_fields = [] # Job status update hook: # callable that is called on starting a job, and getting job metadata @@ -81,9 +83,13 @@ def _handle_post_result(self, request, context): def _handle_post_jobs(self, request, context): """handler of `POST /jobs` (create batch job)""" - pg = request.json()["process"]["process_graph"] + post_data = request.json() + pg = post_data["process"]["process_graph"] job_id = f"job-{len(self.batch_jobs):03d}" - self.batch_jobs[job_id] = {"job_id": job_id, "pg": pg, "status": "created"} + job_data = {"job_id": job_id, "pg": pg, "status": "created"} + for field in self.extra_job_metadata_fields: + job_data[field] = post_data.get(field) + self.batch_jobs[job_id] = job_data context.status_code = 201 context.headers["openeo-identifier"] = job_id diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index b2f3bd5bb..e103b2e02 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1094,6 +1094,7 @@ def test_minimal(self, con, dummy_backend, remote_process_definitions): def test_basic(self, con, dummy_backend, remote_process_definitions): """Basic parameterized UDP job generation""" + dummy_backend.extra_job_metadata_fields = ["title", "description"] job_factory = UDPJobFactory(process_id="increment", namespace="https://remote.test/increment.json") job = job_factory.start_job(row=pd.Series({"data": 123}), connection=con) @@ -1110,6 +1111,8 @@ def test_basic(self, con, dummy_backend, remote_process_definitions): } }, "status": "created", + "title": "Process 'increment' with {'data': 123, 'increment': 1}", + "description": "Process 'increment' (namespace https://remote.test/increment.json) with {'data': 123, 'increment': 1}", } } assert remote_process_definitions["increment"].call_count == 1 From d29d3eeaff856fad33fec55539b98aa4ef1db86c Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Mon, 14 Oct 2024 14:47:38 +0200 Subject: [PATCH 13/22] Issue #604/#644 changelog entry and usage example --- CHANGELOG.md | 1 + openeo/extra/job_management.py | 27 +++++++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e91aaabfc..6021bf399 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 Also added `create_job_db()` factory to easily create a job database from a given dataframe and its type guessed from filename extension. ([#635](https://github.com/Open-EO/openeo-python-client/issues/635)) - `MultiBackendJobManager.run_jobs()` now returns a dictionary with counters/stats about various events during the job run ([#645](https://github.com/Open-EO/openeo-python-client/issues/645)) +- Added `UDPJobFactory` to be used as `start_job` callable with `MultiBackendJobManager` to create multiple jobs from a single parameterized process (e.g. a UDP or remote process definition) ([#604](https://github.com/Open-EO/openeo-python-client/issues/604)) ### Changed diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 4bfdceefe..ec3e24dcc 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -944,6 +944,32 @@ class UDPJobFactory: Batch job factory based on a parameterized process definition (e.g a user-defined process (UDP) or a remote process definition), to be used together with :py:class:`MultiBackendJobManager`. + + Usage example with a remote process definition: + + .. code-block:: python + + from openeo.extra.job_management import ( + MultiBackendJobManager, + create_job_db, + UDPJobFactory, + ) + + # Job creator, based on a parameterized openEO process definition + job_starter = UDPJobFactory( + process_id="my_process", + namespace="https://example.com/my_process.json", + ) + + # Initialize job database from dataframe, with parameters to use. + df = pd.DataFrame(...) + job_db = create_job_db("jobs.csv").initialize_from_df(df) + + # Create and run job manager + job_manager = MultiBackendJobManager(...) + job_manager.run_jobs(job_db=job_db, start_job=job_starter) + + .. versionadded:: 0.33.0 """ def __init__( @@ -954,6 +980,7 @@ def __init__( parameter_defaults: Optional[dict] = None, parameter_column_map: Optional[dict] = None, ): + # TODO: allow process_id to be None too? when remote process definition fully comes from URL self._process_id = process_id self._namespace = namespace self._parameter_defaults = parameter_defaults or {} From a92e47f6b2d33cc707bc3938a1c71fd05723296c Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Mon, 14 Oct 2024 15:22:02 +0200 Subject: [PATCH 14/22] Issue #604/#644 add process_id to docs --- docs/cookbook/job_manager.rst | 2 ++ openeo/extra/job_management.py | 9 +++++++++ 2 files changed, 11 insertions(+) diff --git a/docs/cookbook/job_manager.rst b/docs/cookbook/job_manager.rst index 171cc3fb7..915cf18b5 100644 --- a/docs/cookbook/job_manager.rst +++ b/docs/cookbook/job_manager.rst @@ -14,3 +14,5 @@ Multi Backend Job Manager .. autoclass:: openeo.extra.job_management.CsvJobDatabase .. autoclass:: openeo.extra.job_management.ParquetJobDatabase + +.. autoclass:: openeo.extra.job_management.UDPJobFactory diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index ec3e24dcc..20825b841 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -969,6 +969,15 @@ class UDPJobFactory: job_manager = MultiBackendJobManager(...) job_manager.run_jobs(job_db=job_db, start_job=job_starter) + :param process_id: (optional) openEO process identifier. + Can be omitted when working with a remote process definition + given as URL in the ``namespace`` parameter. + :param namespace: (optional) openEO process namespace. + Typically used to provide a URL to a remote process definition. + :param parameter_defaults: Default values for process parameters, + to be used when not provided from the dataframe row in + :py:meth:`MultiBackendJobManager.run_jobs`. + .. versionadded:: 0.33.0 """ From 84ee8ea521fcdbeee8729bfe4804af149c323ae8 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Mon, 14 Oct 2024 15:32:13 +0200 Subject: [PATCH 15/22] Issue #604/#644 UDPJobFactory: make process_id optional (if namespace is given) --- openeo/extra/job_management.py | 17 ++++---- openeo/internal/processes/parse.py | 5 +++ tests/extra/test_job_management.py | 56 +++++++++++++++++++++++++- tests/internal/processes/test_parse.py | 7 ++++ 4 files changed, 75 insertions(+), 10 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 20825b841..d18dd6a27 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -957,7 +957,6 @@ class UDPJobFactory: # Job creator, based on a parameterized openEO process definition job_starter = UDPJobFactory( - process_id="my_process", namespace="https://example.com/my_process.json", ) @@ -983,13 +982,14 @@ class UDPJobFactory: def __init__( self, - process_id: str, *, + process_id: Optional[str] = None, namespace: Union[str, None] = None, parameter_defaults: Optional[dict] = None, parameter_column_map: Optional[dict] = None, ): - # TODO: allow process_id to be None too? when remote process definition fully comes from URL + if process_id is None and namespace is None: + raise ValueError("At least one of `process_id` and `namespace` should be provided.") self._process_id = process_id self._namespace = namespace self._parameter_defaults = parameter_defaults or {} @@ -1024,6 +1024,7 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: """ process_definition = self._get_process_definition(connection=connection) + process_id = process_definition.id parameters = process_definition.parameters or [] if self._parameter_column_map is None: @@ -1046,7 +1047,7 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: # Skip optional parameters without any fallback default value continue else: - raise ValueError(f"Missing required parameter {param_name !r} for process {self._process_id!r}") + raise ValueError(f"Missing required parameter {param_name !r} for process {process_id!r}") # Prepare some values/dtypes for JSON encoding if isinstance(value, numpy.integer): @@ -1058,12 +1059,10 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: arguments[param_name] = value - cube = connection.datacube_from_process(process_id=self._process_id, namespace=self._namespace, **arguments) + cube = connection.datacube_from_process(process_id=process_id, namespace=self._namespace, **arguments) - title = row.get("title", f"Process {self._process_id!r} with {repr_truncate(arguments)}") - description = row.get( - "description", f"Process {self._process_id!r} (namespace {self._namespace}) with {arguments}" - ) + title = row.get("title", f"Process {process_id!r} with {repr_truncate(arguments)}") + description = row.get("description", f"Process {process_id!r} (namespace {self._namespace}) with {arguments}") job = connection.create_job(cube, title=title, description=description) return job diff --git a/openeo/internal/processes/parse.py b/openeo/internal/processes/parse.py index 0d62fe4cf..1e22ba6bc 100644 --- a/openeo/internal/processes/parse.py +++ b/openeo/internal/processes/parse.py @@ -156,4 +156,9 @@ def parse_remote_process_definition(namespace: str, process_id: Optional[str] = raise LookupError(f"Process {process_id!r} not found in process listing {namespace!r}") (data,) = processes + # Some final validation. + assert "id" in data, "Process definition should at least have an 'id' field" + if process_id is not None and data["id"] != process_id: + raise LookupError(f"Expected process id {process_id!r}, but found {data['id']!r}") + return Process.from_dict(data) diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index e103b2e02..ea0a13f10 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1064,7 +1064,9 @@ def dummy_backend(self, requests_mock, con) -> DummyBackend: @pytest.fixture(autouse=True) def remote_process_definitions(self, requests_mock) -> dict: mocks = {} - for pg in [self.PG_3PLUS5, self.PG_INCREMENT, self.PG_OFFSET_POLYGON]: + processes = [self.PG_3PLUS5, self.PG_INCREMENT, self.PG_OFFSET_POLYGON] + mocks["_all"] = requests_mock.get("https://remote.test/_all", json={"processes": processes, "links": []}) + for pg in processes: process_id = pg["id"] mocks[process_id] = requests_mock.get(f"https://remote.test/{process_id}.json", json=pg) return mocks @@ -1151,6 +1153,58 @@ def test_basic_parameterization(self, con, dummy_backend, parameter_defaults, ro } } + @pytest.mark.parametrize( + ["process_id", "namespace", "expected"], + [ + ( + # Classic UDP reference + "3plus5", + None, + {"process_id": "3plus5"}, + ), + ( + # Remote process definition (with "redundant" process_id) + "3plus5", + "https://remote.test/3plus5.json", + {"process_id": "3plus5", "namespace": "https://remote.test/3plus5.json"}, + ), + ( + # Remote process definition with just namespace (process_id should be inferred from that) + None, + "https://remote.test/3plus5.json", + {"process_id": "3plus5", "namespace": "https://remote.test/3plus5.json"}, + ), + ( + # Remote process definition from listing + "3plus5", + "https://remote.test/_all", + {"process_id": "3plus5", "namespace": "https://remote.test/_all"}, + ), + ], + ) + def test_process_references_in_constructor( + self, con, requests_mock, dummy_backend, remote_process_definitions, process_id, namespace, expected + ): + """Various ways to provide process references in the constructor""" + + # Register personal UDP + requests_mock.get(con.build_url("/process_graphs/3plus5"), json=self.PG_3PLUS5) + + job_factory = UDPJobFactory(process_id=process_id, namespace=namespace) + + job = job_factory.start_job(row=pd.Series({"foo": 123}), connection=con) + assert isinstance(job, BatchJob) + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": {"3plus51": {**expected, "arguments": {}, "result": True}}, + "status": "created", + } + } + + def test_no_process_id_nor_namespace(self): + with pytest.raises(ValueError, match="At least one of `process_id` and `namespace` should be provided"): + _ = UDPJobFactory() @pytest.fixture def job_manager(self, tmp_path, dummy_backend) -> MultiBackendJobManager: diff --git a/tests/internal/processes/test_parse.py b/tests/internal/processes/test_parse.py index ac8669d96..48405e537 100644 --- a/tests/internal/processes/test_parse.py +++ b/tests/internal/processes/test_parse.py @@ -220,3 +220,10 @@ def test_parse_remote_process_definition_listing(requests_mock): assert process.returns is None assert process.description is None assert process.summary is None + + +def test_parse_remote_process_definition_inconsistency(requests_mock): + url = "https://example.com/ndvi.json" + requests_mock.get(url, json={"id": "nnddvvii"}) + with pytest.raises(LookupError, match="Expected process id 'ndvi', but found 'nnddvvii'"): + _ = parse_remote_process_definition(url, process_id="ndvi") From ddc9ee599656d22f3a52803008a95581aba962d2 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Mon, 14 Oct 2024 15:57:47 +0200 Subject: [PATCH 16/22] Issue #604/#644 replace lru_cache trick with cleaner cache --- openeo/extra/job_management.py | 12 ++++++------ tests/extra/test_job_management.py | 2 ++ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index d18dd6a27..d277f058d 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -2,7 +2,6 @@ import collections import contextlib import datetime -import functools import json import logging import re @@ -27,7 +26,7 @@ parse_remote_process_definition, ) from openeo.rest import OpenEoApiError -from openeo.util import deep_get, repr_truncate, rfc3339 +from openeo.util import LazyLoadCache, deep_get, repr_truncate, rfc3339 _log = logging.getLogger(__name__) @@ -994,11 +993,15 @@ def __init__( self._namespace = namespace self._parameter_defaults = parameter_defaults or {} self._parameter_column_map = parameter_column_map + self._cache = LazyLoadCache() def _get_process_definition(self, connection: Connection) -> Process: if isinstance(self._namespace, str) and re.match("https?://", self._namespace): # Remote process definition handling - return self._get_remote_process_definition() + return self._cache.get( + key=("remote_process_definition", self._namespace, self._process_id), + load=lambda: parse_remote_process_definition(namespace=self._namespace, process_id=self._process_id), + ) elif self._namespace is None: # Handling of a user-specific UDP udp_raw = connection.user_defined_process(self._process_id).describe() @@ -1008,9 +1011,6 @@ def _get_process_definition(self, connection: Connection) -> Process: f"Unsupported process definition source udp_id={self._process_id!r} namespace={self._namespace!r}" ) - @functools.lru_cache() - def _get_remote_process_definition(self) -> Process: - return parse_remote_process_definition(namespace=self._namespace, process_id=self._process_id) def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: """ diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index ea0a13f10..fae8cc2d5 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1235,6 +1235,8 @@ def test_with_job_manager_remote_basic( } ) assert set(job_db.read().status) == {"finished"} + + # Verify caching of HTTP request of remote process definition assert remote_process_definitions["increment"].call_count == 1 assert dummy_backend.batch_jobs == { From e22a791d142b08c111455bcb91648d0b2638607c Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Mon, 14 Oct 2024 16:08:14 +0200 Subject: [PATCH 17/22] Issue #604/#644 further documentation finetuning --- docs/cookbook/job_manager.rst | 2 ++ openeo/extra/job_management.py | 36 +++++++++++++++++++++++++++++----- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/docs/cookbook/job_manager.rst b/docs/cookbook/job_manager.rst index 915cf18b5..58dd0f892 100644 --- a/docs/cookbook/job_manager.rst +++ b/docs/cookbook/job_manager.rst @@ -16,3 +16,5 @@ Multi Backend Job Manager .. autoclass:: openeo.extra.job_management.ParquetJobDatabase .. autoclass:: openeo.extra.job_management.UDPJobFactory + :members: + :special-members: __call__ diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index d277f058d..b22ff41db 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -967,6 +967,29 @@ class UDPJobFactory: job_manager = MultiBackendJobManager(...) job_manager.run_jobs(job_db=job_db, start_job=job_starter) + The factory will take care of filling in the process parameters + based on matching column names in dataframe from the job database, + with some additional override/fallback options: + + - When provided, ``parameter_column_map`` will be consulted + for resolving a parameter name (key) to a desired column name (value). + - One common case is handled automatically as convenience functionality. + + When: + + - ``parameter_column_map`` is not provided (or set to ``None``), + - and there is a *single parameter* that accepts inline GeoJSON geometries, + - and the dataframe is a GeoPandas dataframe with a *single geometry* column, + + then this parameter and this geometries column will be linked automatically. + + - If a parameter can not be matched with a column by name as described above, + a default value will be picked, + first by looking in ``parameter_defaults`` (if provided), + and then by looking up the default value from the parameter schema in the process definition. + - Finally if no (default) value can be determined and the parameter + is not flagged as optional, an error will be raised. + :param process_id: (optional) openEO process identifier. Can be omitted when working with a remote process definition given as URL in the ``namespace`` parameter. @@ -975,6 +998,10 @@ class UDPJobFactory: :param parameter_defaults: Default values for process parameters, to be used when not provided from the dataframe row in :py:meth:`MultiBackendJobManager.run_jobs`. + :param parameter_column_map: Optional overrides + for linking parameters to dataframe columns: + mapping of process parameter names as key + to dataframe column names as value. .. versionadded:: 0.33.0 """ @@ -1014,13 +1041,12 @@ def _get_process_definition(self, connection: Connection) -> Process: def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: """ - Implementation of the `start_job` callable interface for MultiBackendJobManager: - Create and start a job based on given dataframe row + Implementation of the ``start_job`` callable interface + of :py:meth:`MultiBackendJobManager.run_jobs` + to create a job based on given dataframe row :param row: The row in the pandas dataframe that stores the jobs state and other tracked data. :param connection: The connection to the backend. - - :return: The started job. """ process_definition = self._get_process_definition(connection=connection) @@ -1068,7 +1094,7 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: return job def __call__(self, *arg, **kwargs) -> BatchJob: - """Syntactic sugar for calling `start_job` directly.""" + """Syntactic sugar for calling :py:meth:`start_job`.""" return self.start_job(*arg, **kwargs) @staticmethod From 130db87af709e0480f2942d15b3d53dbcc0417ca Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Mon, 14 Oct 2024 17:01:54 +0200 Subject: [PATCH 18/22] Issue #604/#644 add tests for parameter_column_map --- tests/extra/test_job_management.py | 53 ++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index fae8cc2d5..9ea9067ac 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1599,3 +1599,56 @@ def test_with_job_manager_udp_basic( "status": "finished", }, } + + def test_with_job_manager_parameter_column_map( + self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock, remote_process_definitions + ): + job_starter = UDPJobFactory( + process_id="increment", + namespace="https://remote.test/increment.json", + parameter_column_map={"data": "numberzzz", "increment": "add_thiz"}, + ) + + df = pd.DataFrame( + { + "data": [1, 2], + "increment": [-1, -2], + "numberzzz": [3, 5], + "add_thiz": [100, 200], + } + ) + job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df) + + stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter) + assert stats == dirty_equals.IsPartialDict( + { + "start_job call": 2, + "job finished": 2, + } + ) + assert dummy_backend.batch_jobs == { + "job-000": { + "job_id": "job-000", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": {"data": 3, "increment": 100}, + "result": True, + } + }, + "status": "finished", + }, + "job-001": { + "job_id": "job-001", + "pg": { + "increment1": { + "process_id": "increment", + "namespace": "https://remote.test/increment.json", + "arguments": {"data": 5, "increment": 200}, + "result": True, + } + }, + "status": "finished", + }, + } From 2667733563e9eeac00879b0d316d858b4c2dcb5a Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Mon, 14 Oct 2024 17:15:29 +0200 Subject: [PATCH 19/22] Issue #604/#644 add some todo note for furtherdevelopment ideas --- openeo/extra/job_management.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index b22ff41db..1125f7e2e 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -1006,6 +1006,9 @@ class UDPJobFactory: .. versionadded:: 0.33.0 """ + # TODO: find a better class name (e.g. eliminate over-specificity of "UDP", + # or avoid "factory" as technical mumbo-jumbo)? + def __init__( self, *, @@ -1048,6 +1051,8 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob: :param row: The row in the pandas dataframe that stores the jobs state and other tracked data. :param connection: The connection to the backend. """ + # TODO: refactor out some methods, for better reuse and decoupling: + # `get_arguments()` (to build the arguments dictionary), `get_cube()` (to create the cube), process_definition = self._get_process_definition(connection=connection) process_id = process_definition.id From 1917a7340d470b741e9fc9303c51b71a25d67805 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Wed, 16 Oct 2024 10:14:01 +0200 Subject: [PATCH 20/22] Issue #604/#644 finetune docs based on review --- openeo/extra/job_management.py | 48 ++++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 1125f7e2e..84563f47e 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -940,9 +940,13 @@ def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str = class UDPJobFactory: """ - Batch job factory based on a parameterized process definition - (e.g a user-defined process (UDP) or a remote process definition), - to be used together with :py:class:`MultiBackendJobManager`. + Batch job creator + (to be used together with :py:class:`MultiBackendJobManager`) + that takes a parameterized openEO process definition + (e.g a user-defined process (UDP) or a remote openEO process definition), + and creates a batch job + for each row of the dataframe managed by the :py:class:`MultiBackendJobManager` + by filling in the process parameters with corresponding row values. Usage example with a remote process definition: @@ -954,13 +958,24 @@ class UDPJobFactory: UDPJobFactory, ) - # Job creator, based on a parameterized openEO process definition + # Job creator, based on a parameterized openEO process + # (specified by the remote process definition at given URL) + # which has, say, parameters "start_date" and "bands" for example. job_starter = UDPJobFactory( namespace="https://example.com/my_process.json", + parameter_defaults={ + # Default value for the "bands" parameter + # (to be used when not available in the dataframe) + "bands": ["B02", "B03"], + }, ) - # Initialize job database from dataframe, with parameters to use. - df = pd.DataFrame(...) + # Initialize job database from a dataframe, + # with desired parameter values to fill in. + df = pd.DataFrame({ + "start_date": ["2021-01-01", "2021-02-01", "2021-03-01"], + ... + }) job_db = create_job_db("jobs.csv").initialize_from_df(df) # Create and run job manager @@ -968,11 +983,16 @@ class UDPJobFactory: job_manager.run_jobs(job_db=job_db, start_job=job_starter) The factory will take care of filling in the process parameters - based on matching column names in dataframe from the job database, - with some additional override/fallback options: + based on matching column names in the dataframe from the job database + (like "start_date" in the example above). + + This intuitive name-based matching should cover most use cases, + but for some more advanced use cases, there are additional options + to provide overrides and fallbacks: - When provided, ``parameter_column_map`` will be consulted - for resolving a parameter name (key) to a desired column name (value). + for resolving a process parameter name (key in the dictionary) + to a desired dataframe column name (corresponding value). - One common case is handled automatically as convenience functionality. When: @@ -992,14 +1012,14 @@ class UDPJobFactory: :param process_id: (optional) openEO process identifier. Can be omitted when working with a remote process definition - given as URL in the ``namespace`` parameter. + that is fully defined with a URL in the ``namespace`` parameter. :param namespace: (optional) openEO process namespace. Typically used to provide a URL to a remote process definition. - :param parameter_defaults: Default values for process parameters, - to be used when not provided from the dataframe row in - :py:meth:`MultiBackendJobManager.run_jobs`. + :param parameter_defaults: (optional) default values for process parameters, + to be used when not available in the dataframe managed by + :py:class:`MultiBackendJobManager`. :param parameter_column_map: Optional overrides - for linking parameters to dataframe columns: + for linking process parameters to dataframe columns: mapping of process parameter names as key to dataframe column names as value. From 8ffa2a61a873e89df268286fd156288cf11de47f Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Wed, 16 Oct 2024 10:27:13 +0200 Subject: [PATCH 21/22] Issue #604/#644 Rename UDPJobFactory to ProcessBasedJobCreator --- CHANGELOG.md | 2 +- docs/cookbook/job_manager.rst | 2 +- openeo/extra/job_management.py | 10 +++------- tests/extra/test_job_management.py | 26 +++++++++++++------------- 4 files changed, 18 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6021bf399..873bd90fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 Also added `create_job_db()` factory to easily create a job database from a given dataframe and its type guessed from filename extension. ([#635](https://github.com/Open-EO/openeo-python-client/issues/635)) - `MultiBackendJobManager.run_jobs()` now returns a dictionary with counters/stats about various events during the job run ([#645](https://github.com/Open-EO/openeo-python-client/issues/645)) -- Added `UDPJobFactory` to be used as `start_job` callable with `MultiBackendJobManager` to create multiple jobs from a single parameterized process (e.g. a UDP or remote process definition) ([#604](https://github.com/Open-EO/openeo-python-client/issues/604)) +- Added `ProcessBasedJobCreator` to be used as `start_job` callable with `MultiBackendJobManager` to create multiple jobs from a single parameterized process (e.g. a UDP or remote process definition) ([#604](https://github.com/Open-EO/openeo-python-client/issues/604)) ### Changed diff --git a/docs/cookbook/job_manager.rst b/docs/cookbook/job_manager.rst index 58dd0f892..c505e44e5 100644 --- a/docs/cookbook/job_manager.rst +++ b/docs/cookbook/job_manager.rst @@ -15,6 +15,6 @@ Multi Backend Job Manager .. autoclass:: openeo.extra.job_management.ParquetJobDatabase -.. autoclass:: openeo.extra.job_management.UDPJobFactory +.. autoclass:: openeo.extra.job_management.ProcessBasedJobCreator :members: :special-members: __call__ diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 84563f47e..09dc5bdef 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -938,7 +938,7 @@ def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str = return job_db -class UDPJobFactory: +class ProcessBasedJobCreator: """ Batch job creator (to be used together with :py:class:`MultiBackendJobManager`) @@ -955,13 +955,13 @@ class UDPJobFactory: from openeo.extra.job_management import ( MultiBackendJobManager, create_job_db, - UDPJobFactory, + ProcessBasedJobCreator, ) # Job creator, based on a parameterized openEO process # (specified by the remote process definition at given URL) # which has, say, parameters "start_date" and "bands" for example. - job_starter = UDPJobFactory( + job_starter = ProcessBasedJobCreator( namespace="https://example.com/my_process.json", parameter_defaults={ # Default value for the "bands" parameter @@ -1025,10 +1025,6 @@ class UDPJobFactory: .. versionadded:: 0.33.0 """ - - # TODO: find a better class name (e.g. eliminate over-specificity of "UDP", - # or avoid "factory" as technical mumbo-jumbo)? - def __init__( self, *, diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 9ea9067ac..c9a612af9 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -30,7 +30,7 @@ CsvJobDatabase, MultiBackendJobManager, ParquetJobDatabase, - UDPJobFactory, + ProcessBasedJobCreator, create_job_db, get_job_db, ) @@ -1009,7 +1009,7 @@ def test_create_job_db(tmp_path, filename, expected): assert path.exists() -class TestUDPJobFactory: +class TestProcessBasedJobCreator: @pytest.fixture def dummy_backend(self, requests_mock, con) -> DummyBackend: dummy = DummyBackend(requests_mock=requests_mock, connection=con) @@ -1073,7 +1073,7 @@ def remote_process_definitions(self, requests_mock) -> dict: def test_minimal(self, con, dummy_backend, remote_process_definitions): """Bare minimum: just start a job, no parameters/arguments""" - job_factory = UDPJobFactory(process_id="3plus5", namespace="https://remote.test/3plus5.json") + job_factory = ProcessBasedJobCreator(process_id="3plus5", namespace="https://remote.test/3plus5.json") job = job_factory.start_job(row=pd.Series({"foo": 123}), connection=con) assert isinstance(job, BatchJob) @@ -1097,7 +1097,7 @@ def test_minimal(self, con, dummy_backend, remote_process_definitions): def test_basic(self, con, dummy_backend, remote_process_definitions): """Basic parameterized UDP job generation""" dummy_backend.extra_job_metadata_fields = ["title", "description"] - job_factory = UDPJobFactory(process_id="increment", namespace="https://remote.test/increment.json") + job_factory = ProcessBasedJobCreator(process_id="increment", namespace="https://remote.test/increment.json") job = job_factory.start_job(row=pd.Series({"data": 123}), connection=con) assert isinstance(job, BatchJob) @@ -1130,7 +1130,7 @@ def test_basic(self, con, dummy_backend, remote_process_definitions): ) def test_basic_parameterization(self, con, dummy_backend, parameter_defaults, row, expected_arguments): """Basic parameterized UDP job generation""" - job_factory = UDPJobFactory( + job_factory = ProcessBasedJobCreator( process_id="increment", namespace="https://remote.test/increment.json", parameter_defaults=parameter_defaults, @@ -1190,7 +1190,7 @@ def test_process_references_in_constructor( # Register personal UDP requests_mock.get(con.build_url("/process_graphs/3plus5"), json=self.PG_3PLUS5) - job_factory = UDPJobFactory(process_id=process_id, namespace=namespace) + job_factory = ProcessBasedJobCreator(process_id=process_id, namespace=namespace) job = job_factory.start_job(row=pd.Series({"foo": 123}), connection=con) assert isinstance(job, BatchJob) @@ -1204,7 +1204,7 @@ def test_process_references_in_constructor( def test_no_process_id_nor_namespace(self): with pytest.raises(ValueError, match="At least one of `process_id` and `namespace` should be provided"): - _ = UDPJobFactory() + _ = ProcessBasedJobCreator() @pytest.fixture def job_manager(self, tmp_path, dummy_backend) -> MultiBackendJobManager: @@ -1215,7 +1215,7 @@ def job_manager(self, tmp_path, dummy_backend) -> MultiBackendJobManager: def test_with_job_manager_remote_basic( self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock, remote_process_definitions ): - job_starter = UDPJobFactory( + job_starter = ProcessBasedJobCreator( process_id="increment", namespace="https://remote.test/increment.json", parameter_defaults={"increment": 5}, @@ -1321,7 +1321,7 @@ def test_with_job_manager_remote_parameter_handling( df_data, expected_arguments, ): - job_starter = UDPJobFactory( + job_starter = ProcessBasedJobCreator( process_id="increment", namespace="https://remote.test/increment.json", parameter_defaults=parameter_defaults, @@ -1381,7 +1381,7 @@ def test_with_job_manager_remote_parameter_handling( } def test_with_job_manager_remote_geometry(self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock): - job_starter = UDPJobFactory( + job_starter = ProcessBasedJobCreator( process_id="offset_polygon", namespace="https://remote.test/offset_polygon.json", parameter_defaults={"data": 123}, @@ -1466,7 +1466,7 @@ def test_with_job_manager_remote_geometry_after_resume( self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock, db_class ): """Test if geometry handling works properly after resuming from CSV serialized job db.""" - job_starter = UDPJobFactory( + job_starter = ProcessBasedJobCreator( process_id="offset_polygon", namespace="https://remote.test/offset_polygon.json", parameter_defaults={"data": 123}, @@ -1554,7 +1554,7 @@ def test_with_job_manager_udp_basic( # Register personal UDP increment_udp_mock = requests_mock.get(con.build_url("/process_graphs/increment"), json=udp) - job_starter = UDPJobFactory( + job_starter = ProcessBasedJobCreator( process_id="increment", # No namespace to trigger personal UDP mode namespace=None, @@ -1603,7 +1603,7 @@ def test_with_job_manager_udp_basic( def test_with_job_manager_parameter_column_map( self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock, remote_process_definitions ): - job_starter = UDPJobFactory( + job_starter = ProcessBasedJobCreator( process_id="increment", namespace="https://remote.test/increment.json", parameter_column_map={"data": "numberzzz", "increment": "add_thiz"}, From 7bf73dec648841a8a569be856fcf54b9ce357ca4 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Wed, 16 Oct 2024 12:37:21 +0200 Subject: [PATCH 22/22] Issue #604/#644 move ProcessBasedJobCreator example to more extensive doc page --- CHANGELOG.md | 4 +- docs/cookbook/job_manager.rst | 102 +++++++++++++++++++++++++++++++++ docs/rst-cheatsheet.rst | 15 ++++- openeo/extra/job_management.py | 53 +++++------------ 4 files changed, 131 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 873bd90fb..805bfb369 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,8 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `MultiBackendJobManager`: Added `initialize_from_df(df)` (to `CsvJobDatabase` and `ParquetJobDatabase`) to initialize (and persist) the job database from a given DataFrame. Also added `create_job_db()` factory to easily create a job database from a given dataframe and its type guessed from filename extension. ([#635](https://github.com/Open-EO/openeo-python-client/issues/635)) -- `MultiBackendJobManager.run_jobs()` now returns a dictionary with counters/stats about various events during the job run ([#645](https://github.com/Open-EO/openeo-python-client/issues/645)) -- Added `ProcessBasedJobCreator` to be used as `start_job` callable with `MultiBackendJobManager` to create multiple jobs from a single parameterized process (e.g. a UDP or remote process definition) ([#604](https://github.com/Open-EO/openeo-python-client/issues/604)) +- `MultiBackendJobManager.run_jobs()` now returns a dictionary with counters/stats about various events during the full run of the job manager ([#645](https://github.com/Open-EO/openeo-python-client/issues/645)) +- Added (experimental) `ProcessBasedJobCreator` to be used as `start_job` callable with `MultiBackendJobManager` to create multiple jobs from a single parameterized process (e.g. a UDP or remote process definition) ([#604](https://github.com/Open-EO/openeo-python-client/issues/604)) ### Changed diff --git a/docs/cookbook/job_manager.rst b/docs/cookbook/job_manager.rst index c505e44e5..b5219dc72 100644 --- a/docs/cookbook/job_manager.rst +++ b/docs/cookbook/job_manager.rst @@ -2,6 +2,9 @@ Multi Backend Job Manager ==================================== +API +=== + .. warning:: This is a new experimental API, subject to change. @@ -15,6 +18,105 @@ Multi Backend Job Manager .. autoclass:: openeo.extra.job_management.ParquetJobDatabase + .. autoclass:: openeo.extra.job_management.ProcessBasedJobCreator :members: :special-members: __call__ + + +.. _job-management-with-process-based-job-creator: + +Job creation based on parameterized processes +=============================================== + +The openEO API supports parameterized processes out of the box, +which allows to work with flexible, reusable openEO building blocks +in the form of :ref:`user-defined processes ` +or `remote openEO process definitions `_. +This can also be leveraged for job creation in the context of the +:py:class:`~openeo.extra.job_management.MultiBackendJobManager`: +define a "template" job as a parameterized process +and let the job manager fill in the parameters +from a given data frame. + +The :py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` helper class +allows to do exactly that. +Given a reference to a parameterized process, +such as a user-defined process or remote process definition, +it can be used directly as ``start_job`` callable to +:py:meth:`~openeo.extra.job_management.MultiBackendJobManager.run_jobs` +which will fill in the process parameters from the dataframe. + +Basic :py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` example +----------------------------------------------------------------------------- + +Basic usage example with a remote process definition: + +.. code-block:: python + :linenos: + :caption: Basic :py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` example snippet + :emphasize-lines: 10-15, 28 + + from openeo.extra.job_management import ( + MultiBackendJobManager, + create_job_db, + ProcessBasedJobCreator, + ) + + # Job creator, based on a parameterized openEO process + # (specified by the remote process definition at given URL) + # which has parameters "start_date" and "bands" for example. + job_starter = ProcessBasedJobCreator( + namespace="https://example.com/my_process.json", + parameter_defaults={ + "bands": ["B02", "B03"], + }, + ) + + # Initialize job database from a dataframe, + # with desired parameter values to fill in. + df = pd.DataFrame({ + "start_date": ["2021-01-01", "2021-02-01", "2021-03-01"], + }) + job_db = create_job_db("jobs.csv").initialize_from_df(df) + + # Create and run job manager, + # which will start a job for each of the `start_date` values in the dataframe + # and use the default band list ["B02", "B03"] for the "bands" parameter. + job_manager = MultiBackendJobManager(...) + job_manager.run_jobs(job_db=job_db, start_job=job_starter) + +In this example, a :py:class:`ProcessBasedJobCreator` is instantiated +based on a remote process definition, +which has parameters ``start_date`` and ``bands``. +When passed to :py:meth:`~openeo.extra.job_management.MultiBackendJobManager.run_jobs`, +a job for each row in the dataframe will be created, +with parameter values based on matching columns in the dataframe: + +- the ``start_date`` parameter will be filled in + with the values from the "start_date" column of the dataframe, +- the ``bands`` parameter has no corresponding column in the dataframe, + and will get its value from the default specified in the ``parameter_defaults`` argument. + + +:py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` with geometry handling +--------------------------------------------------------------------------------------------- + +Apart from the intuitive name-based parameter-column linking, +:py:class:`~openeo.extra.job_management.ProcessBasedJobCreator` +also automatically links: + +- a process parameters that accepts inline GeoJSON geometries/features + (which practically means it has a schema like ``{"type": "object", "subtype": "geojson"}``, + as produced by :py:meth:`Parameter.geojson `). +- with the geometry column in a `GeoPandas `_ dataframe. + +even if the name of the parameter does not exactly match +the name of the GeoPandas geometry column (``geometry`` by default). +This automatic liking is only done if there is only one +GeoJSON parameter and one geometry column in the dataframe. + + +.. admonition:: to do + + Add example with geometry handling. diff --git a/docs/rst-cheatsheet.rst b/docs/rst-cheatsheet.rst index d1bd37360..c02e4a15d 100644 --- a/docs/rst-cheatsheet.rst +++ b/docs/rst-cheatsheet.rst @@ -50,6 +50,15 @@ More explicit code block with language hint (and no need for double colon) >>> 3 + 5 8 +Code block with additional features (line numbers, caption, highlighted lines, +for more see https://www.sphinx-doc.org/en/master/usage/restructuredtext/directives.html#directive-code-block) + +.. code-block:: python + :linenos: + :caption: how to say hello + :emphasize-lines: 1 + + print("hello world") References: @@ -60,4 +69,8 @@ References: - refer to the reference with:: - :ref:`target` + :ref:`target` or :ref:`custom text ` + +- inline URL references:: + + `Python `_ diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 09dc5bdef..d7f370291 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -948,47 +948,13 @@ class ProcessBasedJobCreator: for each row of the dataframe managed by the :py:class:`MultiBackendJobManager` by filling in the process parameters with corresponding row values. - Usage example with a remote process definition: + .. seealso:: + See :ref:`job-management-with-process-based-job-creator` + for more information and examples. - .. code-block:: python - - from openeo.extra.job_management import ( - MultiBackendJobManager, - create_job_db, - ProcessBasedJobCreator, - ) - - # Job creator, based on a parameterized openEO process - # (specified by the remote process definition at given URL) - # which has, say, parameters "start_date" and "bands" for example. - job_starter = ProcessBasedJobCreator( - namespace="https://example.com/my_process.json", - parameter_defaults={ - # Default value for the "bands" parameter - # (to be used when not available in the dataframe) - "bands": ["B02", "B03"], - }, - ) - - # Initialize job database from a dataframe, - # with desired parameter values to fill in. - df = pd.DataFrame({ - "start_date": ["2021-01-01", "2021-02-01", "2021-03-01"], - ... - }) - job_db = create_job_db("jobs.csv").initialize_from_df(df) - - # Create and run job manager - job_manager = MultiBackendJobManager(...) - job_manager.run_jobs(job_db=job_db, start_job=job_starter) - - The factory will take care of filling in the process parameters - based on matching column names in the dataframe from the job database - (like "start_date" in the example above). - - This intuitive name-based matching should cover most use cases, - but for some more advanced use cases, there are additional options - to provide overrides and fallbacks: + Process parameters are linked to dataframe columns by name. + While this intuitive name-based matching should cover most use cases, + there are additional options for overrides or fallbacks: - When provided, ``parameter_column_map`` will be consulted for resolving a process parameter name (key in the dictionary) @@ -1010,6 +976,7 @@ class ProcessBasedJobCreator: - Finally if no (default) value can be determined and the parameter is not flagged as optional, an error will be raised. + :param process_id: (optional) openEO process identifier. Can be omitted when working with a remote process definition that is fully defined with a URL in the ``namespace`` parameter. @@ -1024,6 +991,12 @@ class ProcessBasedJobCreator: to dataframe column names as value. .. versionadded:: 0.33.0 + + .. warning:: + This is an experimental API subject to change, + and we greatly welcome + `feedback and suggestions for improvement `_. + """ def __init__( self,