Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue404 automatically validate process graph before download or execute #481

Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4de7d52
Issue #404 Add automatic process graph validation, when backend suppo…
JohanKJSchreurs Oct 3, 2023
876a7c2
Issue #404 Pass validate parameter through in datacube
JohanKJSchreurs Oct 3, 2023
adde746
Issue #404 Tests: Simplify fixture connection_with_pgvalidation
JohanKJSchreurs Oct 3, 2023
bdf5206
Issue #404 Pass validate parameter through in vectorcube
JohanKJSchreurs Oct 3, 2023
2732869
Issue #404 Add test coverage for processgraph validation in datacube
JohanKJSchreurs Oct 3, 2023
ec31b84
Issue #404 Add test coverage for processgraph validation in vectorcube
JohanKJSchreurs Oct 3, 2023
405be57
Issue #404 Remove toggle VALIDATE_PROCESS_GRAPH_BY_DEFAULT
JohanKJSchreurs Oct 4, 2023
41dd4ec
Issue #404 Update changelog
JohanKJSchreurs Oct 4, 2023
f8d523c
Issue #404 Code review, more robust error handling during validation
JohanKJSchreurs Oct 4, 2023
455632d
Issue #404 Minor clarifications in unit test
JohanKJSchreurs Oct 4, 2023
54a42d3
Issue #404/PR #481 introduce connection level auto_validate option
soxofaan Oct 17, 2023
0157f8e
fixup! Issue #404/PR #481 introduce connection level auto_validate op…
soxofaan Oct 17, 2023
788a954
Issue #404/PR#481 test_connection streamlining
soxofaan Oct 17, 2023
dd4804c
Issue #404/PR#481 test_datacube100 streamlining
soxofaan Oct 18, 2023
c5e145e
fixup! Issue #404/PR#481 test_datacube100 streamlining
soxofaan Oct 18, 2023
ec63990
fixup! fixup! Issue #404/PR#481 test_datacube100 streamlining
soxofaan Oct 18, 2023
d6f8797
fixup! fixup! fixup! Issue #404/PR#481 test_datacube100 streamlining
soxofaan Oct 18, 2023
b90f252
fixup! fixup! fixup! fixup! Issue #404/PR#481 test_datacube100 stream…
soxofaan Oct 18, 2023
0788181
fixup! fixup! fixup! fixup! fixup! Issue #404/PR#481 test_datacube100…
soxofaan Oct 18, 2023
93b423a
fixup! fixup! fixup! fixup! fixup! fixup! Issue #404/PR#481 test_data…
soxofaan Oct 18, 2023
2015b9e
fixup! fixup! fixup! fixup! fixup! fixup! fixup! Issue #404/PR#481 te…
soxofaan Oct 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion openeo/rest/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,14 @@ def validate_process_graph(self, process_graph: dict) -> List[dict]:
:param process_graph: (flat) dict representing process graph
:return: list of errors (dictionaries with "code" and "message" fields)
"""
request = {"process_graph": process_graph}
# TODO: sometimes process_graph is already in the graph. Should we really *always* add it?
# Was getting errors in some new unit tests because of the double process_graph but
# perhaps the error is really not here but somewhere else that adds process_graph
# when it should not? Still needs to be confirmed.
if "process_graph" not in process_graph:
request = {"process_graph": process_graph}
else:
request = process_graph
JohanKJSchreurs marked this conversation as resolved.
Show resolved Hide resolved
return self.post(path="/validation", json=request, expected_status=200).json()["errors"]

@property
Expand Down Expand Up @@ -1474,12 +1481,27 @@ def _build_request_with_process_graph(self, process_graph: Union[dict, FlatGraph
result["process"] = process_graph
return result

def _warn_if_process_graph_invalid(self, process_graph: Union[dict, FlatGraphableMixin, str, Path]):
if not self.capabilities().supports_endpoint("/validation", "POST"):
return

graph = as_flat_graph(process_graph)
if "process_graph" not in graph:
graph = {"process_graph": graph}
JohanKJSchreurs marked this conversation as resolved.
Show resolved Hide resolved

validation_errors = self.validate_process_graph(process_graph=graph)
if validation_errors:
_log.warning(
"Process graph is not valid. Validation errors:\n" + "\n".join(e["message"] for e in validation_errors)
)
JohanKJSchreurs marked this conversation as resolved.
Show resolved Hide resolved

# TODO: unify `download` and `execute` better: e.g. `download` always writes to disk, `execute` returns result (raw or as JSON decoded dict)
def download(
self,
graph: Union[dict, FlatGraphableMixin, str, Path],
outputfile: Union[Path, str, None] = None,
timeout: Optional[int] = None,
validate: Optional[bool] = True,
JohanKJSchreurs marked this conversation as resolved.
Show resolved Hide resolved
) -> Union[None, bytes]:
"""
Downloads the result of a process graph synchronously,
Expand All @@ -1491,6 +1513,9 @@ def download(
:param outputfile: output file
:param timeout: timeout to wait for response
"""
if validate:
self._warn_if_process_graph_invalid(process_graph=graph)

request = self._build_request_with_process_graph(process_graph=graph)
response = self.post(
path="/result",
Expand All @@ -1511,6 +1536,7 @@ def execute(
self,
process_graph: Union[dict, str, Path],
timeout: Optional[int] = None,
validate: Optional[bool] = True,
):
"""
Execute a process graph synchronously and return the result (assumed to be JSON).
Expand All @@ -1519,6 +1545,9 @@ def execute(
or as local file path or URL
:return: parsed JSON response
"""
if validate:
self._warn_if_process_graph_invalid(process_graph=process_graph)

req = self._build_request_with_process_graph(process_graph=process_graph)
return self.post(
path="/result",
Expand All @@ -1536,6 +1565,7 @@ def create_job(
plan: Optional[str] = None,
budget: Optional[float] = None,
additional: Optional[dict] = None,
validate: Optional[bool] = True,
) -> BatchJob:
"""
Create a new job from given process graph on the back-end.
Expand All @@ -1550,6 +1580,10 @@ def create_job(
:return: Created job
"""
# TODO move all this (BatchJob factory) logic to BatchJob?

if validate:
self._warn_if_process_graph_invalid(process_graph=process_graph)

req = self._build_request_with_process_graph(
process_graph=process_graph,
**dict_no_none(title=title, description=description, plan=plan, budget=budget)
Expand Down
14 changes: 8 additions & 6 deletions openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -1945,6 +1945,7 @@ def download(
outputfile: Optional[Union[str, pathlib.Path]] = None,
format: Optional[str] = None,
options: Optional[dict] = None,
validate: Optional[bool] = True,
) -> Union[None, bytes]:
"""
Execute synchronously and download the raster data cube, e.g. as GeoTIFF.
Expand All @@ -1961,7 +1962,7 @@ def download(
# TODO #401/#449 don't guess/override format if there is already a save_result with format?
format = guess_format(outputfile)
cube = self._ensure_save_result(format=format, options=options)
return self._connection.download(cube.flat_graph(), outputfile)
return self._connection.download(cube.flat_graph(), outputfile, validate=validate)

def validate(self) -> List[dict]:
"""
Expand Down Expand Up @@ -2062,6 +2063,7 @@ def execute_batch(
max_poll_interval: float = 60,
connection_retry_interval: float = 30,
job_options: Optional[dict] = None,
validate: Optional[bool] = True,
# TODO: avoid `format_options` as keyword arguments
**format_options,
) -> BatchJob:
Expand All @@ -2081,9 +2083,7 @@ def execute_batch(
# TODO #401/#449 don't guess/override format if there is already a save_result with format?
out_format = guess_format(outputfile)

job = self.create_job(
out_format=out_format, job_options=job_options, **format_options
)
job = self.create_job(out_format=out_format, job_options=job_options, validate=validate, **format_options)
return job.run_synchronous(
outputfile=outputfile,
print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval
Expand All @@ -2098,6 +2098,7 @@ def create_job(
plan: Optional[str] = None,
budget: Optional[float] = None,
job_options: Optional[dict] = None,
validate: Optional[bool] = True,
# TODO: avoid `format_options` as keyword arguments
**format_options,
) -> BatchJob:
Expand Down Expand Up @@ -2127,6 +2128,7 @@ def create_job(
description=description,
plan=plan,
budget=budget,
validate=validate,
additional=job_options,
)

Expand Down Expand Up @@ -2162,9 +2164,9 @@ def save_user_defined_process(
returns=returns, categories=categories, examples=examples, links=links,
)

def execute(self) -> dict:
def execute(self, validate: Optional[bool] = True) -> dict:
"""Executes the process graph of the imagery. """
return self._connection.execute(self.flat_graph())
return self._connection.execute(self.flat_graph(), validate=validate)

@staticmethod
@deprecated(reason="Use :py:func:`openeo.udf.run_code.execute_local_udf` instead", version="0.7.0")
Expand Down
12 changes: 8 additions & 4 deletions openeo/rest/vectorcube.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,16 @@ def _ensure_save_result(
cube = self.save_result(format=format or "GeoJSON", options=options)
return cube

def execute(self) -> dict:
def execute(self, validate: Optional[bool] = True) -> dict:
"""Executes the process graph of the imagery."""
return self._connection.execute(self.flat_graph())
return self._connection.execute(self.flat_graph(), validate=validate)

def download(
self,
outputfile: Optional[Union[str, pathlib.Path]] = None,
format: Optional[str] = None,
options: Optional[dict] = None,
validate: Optional[bool] = True,
) -> Union[None, bytes]:
"""
Execute synchronously and download the vector cube.
Expand All @@ -256,7 +257,7 @@ def download(
if format is None and outputfile:
format = guess_format(outputfile)
cube = self._ensure_save_result(format=format, options=options)
return self._connection.download(cube.flat_graph(), outputfile)
return self._connection.download(cube.flat_graph(), outputfile, validate=validate)

def execute_batch(
self,
Expand All @@ -266,6 +267,7 @@ def execute_batch(
max_poll_interval: float = 60,
connection_retry_interval: float = 30,
job_options: Optional[dict] = None,
validate: Optional[bool] = True,
# TODO: avoid using kwargs as format options
**format_options,
) -> BatchJob:
Expand All @@ -287,7 +289,7 @@ def execute_batch(
# TODO #401/#449 don't guess/override format if there is already a save_result with format?
out_format = guess_format(outputfile)

job = self.create_job(out_format, job_options=job_options, **format_options)
job = self.create_job(out_format, job_options=job_options, validate=validate, **format_options)
return job.run_synchronous(
# TODO #135 support multi file result sets too
outputfile=outputfile,
Expand All @@ -303,6 +305,7 @@ def create_job(
plan: Optional[str] = None,
budget: Optional[float] = None,
job_options: Optional[dict] = None,
validate: Optional[bool] = True,
**format_options,
) -> BatchJob:
"""
Expand All @@ -327,6 +330,7 @@ def create_job(
plan=plan,
budget=budget,
additional=job_options,
validate=validate,
)

send_job = legacy_alias(create_job, name="send_job", since="0.10.0")
Expand Down
11 changes: 10 additions & 1 deletion tests/rest/conftest.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import contextlib
import re
import typing
from typing import List, Optional
from unittest import mock

import pytest
import time_machine

from openeo.rest._testing import DummyBackend
import openeo
from openeo.rest._testing import DummyBackend, build_capabilities
from openeo.rest.connection import Connection

API_URL = "https://oeo.test/"
Expand Down Expand Up @@ -87,3 +89,10 @@ def con120(requests_mock):
@pytest.fixture
def dummy_backend(requests_mock, con100) -> DummyBackend:
yield DummyBackend(requests_mock=requests_mock, connection=con100)


@pytest.fixture
def connection_with_pgvalidation(api_version, requests_mock) -> Connection:
"""Connection fixture to a backend that supports validation of the process graph."""
requests_mock.get(API_URL, json=build_capabilities(api_version=api_version, validation=True))
return openeo.connect(API_URL)
6 changes: 6 additions & 0 deletions tests/rest/datacube/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ def con100(requests_mock, support_udp) -> Connection:
return _setup_connection("1.0.0", requests_mock, build_capabilities_kwargs={"udp": support_udp})


@pytest.fixture
def connection_with_pgvalidation_datacube(api_version, requests_mock) -> Connection:
"""Connection fixture to a backend that supports validation of the process graph."""
return _setup_connection("1.0.0", requests_mock, build_capabilities_kwargs={"udp": support_udp, "validation": True})


@pytest.fixture
def s2cube(connection, api_version) -> DataCube:
return connection.load_collection("S2")
Expand Down
96 changes: 96 additions & 0 deletions tests/rest/datacube/test_datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,19 @@
- 1.0.0-style DataCube

"""
import json
import pathlib
from datetime import date, datetime
from unittest import mock

import numpy as np
import pytest
import requests
import shapely
import shapely.geometry

from openeo.rest import BandMathException
from openeo.rest.connection import Connection
from openeo.rest.datacube import DataCube

from ... import load_json_resource
Expand Down Expand Up @@ -807,3 +810,96 @@ def test_save_result_format_options_vs_execute_batch(elf, s2cube, get_create_job
},
"result": True,
}


class TestProcessGraphValidation:
JOB_ID = "j-123"
PROCESS_GRAPH_DICT = {"add1": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}}
PROCESS_GRAPH_STRING = json.dumps(PROCESS_GRAPH_DICT)

@pytest.fixture
def cube_add(self, requests_mock, connection_with_pgvalidation_datacube: Connection) -> DataCube:
requests_mock.post(API_URL + "/result", content=self._post_result_handler_json)
return connection_with_pgvalidation_datacube.datacube_from_json(self.PROCESS_GRAPH_STRING)

def _post_jobs_handler_json(self, response: requests.Request, context):
context.headers["OpenEO-Identifier"] = self.JOB_ID
return b""

def _post_result_handler_json(self, response: requests.Request, context):
pg = response.json()["process"]["process_graph"]
assert pg == self.PROCESS_GRAPH_DICT
return b'{"answer": 8}'

@pytest.mark.parametrize("validate", [True, False])
def test_create_job_with_pg_validation(
self,
requests_mock,
connection_with_pgvalidation_datacube: Connection,
validate,
):
"""The DataCube should pass through request for the validation to the
connection and the validation endpoint should only be called when
validation was requested.
"""
m = requests_mock.post(API_URL + "/validation", json={"errors": []})

requests_mock.post(API_URL + "/jobs", status_code=201, content=self._post_jobs_handler_json)
cube: DataCube = connection_with_pgvalidation_datacube.load_collection("S2")
cube.create_job(validate=validate)

# Validation should be called if and only if it was requested
expected_call_count = 1 if validate else 0
assert m.call_count == expected_call_count

@pytest.mark.parametrize("validate", [True, False])
def test_execute_with_pg_validation(
self,
requests_mock,
cube_add: DataCube,
validate,
):
"""The DataCube should pass through request for the validation to the
connection and the validation endpoint should only be called when
validation was requested.
"""
m = requests_mock.post(API_URL + "/validation", json={"errors": []})
requests_mock.post(API_URL + "/jobs", status_code=201, content=self._post_jobs_handler_json)
requests_mock.post(API_URL + "/result", content=self._post_result_handler_json)

cube_add.execute(validate=validate)

# Validation should be called if and only if it was requested
expected_call_count = 1 if validate else 0
assert m.call_count == expected_call_count

@pytest.mark.parametrize("validate", [True, False])
def test_execute_batch_with_pg_validation(
self,
requests_mock,
cube_add: DataCube,
validate,
):
"""The DataCube should pass through request for the validation to the
connection and the validation endpoint should only be called when
validation was requested.
"""
m = requests_mock.post(API_URL + "/validation", json={"errors": []})
requests_mock.post(API_URL + "/jobs", status_code=201, content=self._post_jobs_handler_json)
requests_mock.post(API_URL + f"/jobs/{self.JOB_ID}/results", status_code=202)
job_metadata = {
"id": self.JOB_ID,
"title": f"Job {self.JOB_ID,}",
"description": f"Job {self.JOB_ID,}",
"process": self.PROCESS_GRAPH_DICT,
"status": "finished",
"created": "2017-01-01T09:32:12Z",
"links": [],
}
requests_mock.get(API_URL + f"/jobs/{self.JOB_ID}", status_code=200, json=job_metadata)

cube_add.execute_batch(validate=validate)

# Validation should be called if and only if it was requested
expected_call_count = 1 if validate else 0
assert m.call_count == expected_call_count
Loading