From 0a8c333fec5ebc9a8984a8249bdf39b3d81088bf Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Thu, 10 Jun 2021 00:34:35 -0400 Subject: [PATCH] conformance of process description with OGC-API v1.0-draft6 + better support of datatype field/attr/property auto-resolution --- CHANGES.rst | 2 + weaver/datatype.py | 163 +++++++++++++++------- weaver/processes/utils.py | 3 + weaver/typedefs.py | 4 +- weaver/wps_restapi/processes/processes.py | 2 +- weaver/wps_restapi/swagger_definitions.py | 57 +++++--- weaver/wps_restapi/utils.py | 2 +- 7 files changed, 160 insertions(+), 73 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 9d30405d7..656eee703 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -15,6 +15,8 @@ Changes: - Remove automatic conversion of falsy/truthy ``string`` and ``integer`` type definitions to ``boolean`` type to align with OpenAPI ``boolean`` type definitions. Non explicit ``boolean`` values will not be automatically converted to ``bool`` anymore. They will require explicit ``false|true`` values. +- Apply conformance updates to align with expected process description schema from + `OGC-API - Processes v1.0-draft6 `_. Fixes: ------ diff --git a/weaver/datatype.py b/weaver/datatype.py index ce2374ecf..35a4f9657 100644 --- a/weaver/datatype.py +++ b/weaver/datatype.py @@ -2,6 +2,7 @@ Definitions of types used by tokens. """ import copy +import inspect import traceback import uuid import warnings @@ -15,8 +16,13 @@ from pywps import Process as ProcessWPS from weaver.exceptions import ProcessInstanceError -from weaver.execute import EXECUTE_CONTROL_OPTION_ASYNC, EXECUTE_CONTROL_OPTIONS -from weaver.formats import ACCEPT_LANGUAGE_EN_US, CONTENT_TYPE_APP_JSON +from weaver.execute import ( + EXECUTE_CONTROL_OPTION_ASYNC, + EXECUTE_CONTROL_OPTIONS, + EXECUTE_TRANSMISSION_MODE_OPTIONS, + EXECUTE_TRANSMISSION_MODE_REFERENCE +) +from weaver.formats import ACCEPT_LANGUAGE_EN_CA, CONTENT_TYPE_APP_JSON, CONTENT_TYPE_APP_XML from weaver.processes.convert import ows2json, wps2json_io from weaver.processes.types import ( PROCESS_APPLICATION, @@ -63,18 +69,16 @@ def __setattr__(self, item, value): prop = getattr(type(self), item) if isinstance(prop, property) and prop.fset is not None: prop.fset(self, value) # noqa - elif item in self: - self[item] = value else: - raise AttributeError("Can't set attribute '{}'.".format(item)) + super(Base, self).__setitem__(item, value) - def __getattr__(self, item): + def __getitem__(self, item): # use existing property getter if defined prop = getattr(type(self), item) if isinstance(prop, property) and prop.fget is not None: - return prop.fget(self, item) # noqa + return prop.fget(self) # noqa elif item in self: - return self[item] + return getattr(self, item, None) else: raise AttributeError("Can't get attribute '{}'.".format(item)) @@ -118,6 +122,20 @@ def params(self): """ raise NotImplementedError("Method 'params' must be defined for storage item representation.") + def dict(self): + """ + Generate a dictionary representation of the object, but with inplace resolution of attributes as applicable. + """ + # update any entries by key with their attribute + _dict = {key: getattr(self, key, dict.__getitem__(self, key)) for key, val in self.items()} + # then, ensure any missing key gets added if a getter property exists for it + props = {prop[0] for prop in inspect.getmembers(self) if not prop[0].startswith("_") and prop[0] not in _dict} + for key in props: + prop = getattr(type(self), key) + if isinstance(prop, property) and prop.fget is not None: + _dict[key] = prop.fget(self) # noqa + return _dict + class Service(Base): """ @@ -138,12 +156,12 @@ def id(self): @property def url(self): """Service URL.""" - return self["url"] + return dict.__getitem__(self, "url") @property def name(self): """Service name.""" - return self["name"] + return dict.__getitem__(self, "name") @property def type(self): @@ -364,7 +382,7 @@ def _get_inputs(self): # type: () -> List[Optional[Dict[str, Any]]] if self.get("inputs") is None: self["inputs"] = list() - return self["inputs"] + return dict.__getitem__(self, "inputs") def _set_inputs(self, inputs): # type: (List[Optional[Dict[str, Any]]]) -> None @@ -560,7 +578,7 @@ def _get_results(self): # type: () -> List[Optional[Dict[str, Any]]] if self.get("results") is None: self["results"] = list() - return self["results"] + return dict.__getitem__(self, "results") def _set_results(self, results): # type: (List[Optional[Dict[str, Any]]]) -> None @@ -575,7 +593,7 @@ def _get_exceptions(self): # type: () -> List[Optional[Dict[str, str]]] if self.get("exceptions") is None: self["exceptions"] = list() - return self["exceptions"] + return dict.__getitem__(self, "exceptions") def _set_exceptions(self, exceptions): # type: (List[Optional[Dict[str, str]]]) -> None @@ -590,7 +608,7 @@ def _get_logs(self): # type: () -> List[Dict[str, str]] if self.get("logs") is None: self["logs"] = list() - return self["logs"] + return dict.__getitem__(self, "logs") def _set_logs(self, logs): # type: (List[Dict[str, str]]) -> None @@ -605,7 +623,7 @@ def _get_tags(self): # type: () -> List[Optional[str]] if self.get("tags") is None: self["tags"] = list() - return self["tags"] + return dict.__getitem__(self, "tags") def _set_tags(self, tags): # type: (List[Optional[str]]) -> None @@ -660,12 +678,11 @@ def response(self, response): response = lxml.etree.tostring(response) self["response"] = response - def _job_url(self, settings): - base_job_url = get_wps_restapi_base_url(settings) + def _job_url(self, base_url=None): if self.service is not None: - base_job_url += sd.provider_service.path.format(provider_id=self.service) + base_url += sd.provider_service.path.format(provider_id=self.service) job_path = sd.process_job_service.path.format(process_id=self.process, job_id=self.id) - return "{base_job_url}{job_path}".format(base_job_url=base_job_url, job_path=job_path) + return "{base_job_url}{job_path}".format(base_job_url=base_url, job_path=job_path) def links(self, container=None, self_link=None): # type: (Optional[AnySettingsContainer], Optional[str]) -> JSON @@ -677,10 +694,14 @@ def links(self, container=None, self_link=None): :param container: object that helps retrieve instance details, namely the host URL. :param self_link: name of a section that represents the current link that will be returned. """ - settings = get_settings(container) if container else {} - job_url = self._job_url(settings) + settings = get_settings(container) + base_url = get_wps_restapi_base_url(settings) + job_url = self._job_url(base_url) + job_list = "{}/{}".format(base_url, sd.jobs_service.path) job_links_body = {"links": [ {"href": job_url, "rel": "status", "title": "Job status."}, + {"href": job_url, "rel": "monitor", "title": "Job monitoring location."}, + {"href": job_list, "rel": "collection", "title": "List of submitted jobs."} ]} job_links = ["logs", "inputs"] if self.status in JOB_STATUS_CATEGORIES[STATUS_CATEGORY_FINISHED]: @@ -692,9 +713,6 @@ def links(self, container=None, self_link=None): for link_type in job_links: link_href = "{job_url}/{res}".format(job_url=job_url, res=link_type) job_links_body["links"].append({"href": link_href, "rel": link_type, "title": "Job {}.".format(link_type)}) - link_meta = {"type": CONTENT_TYPE_APP_JSON, "hreflang": ACCEPT_LANGUAGE_EN_US} - for link in job_links_body["links"]: - link.update(link_meta) if self_link in ["status", "inputs", "outputs", "results", "logs", "exceptions"]: self_link_body = list(filter(lambda _link: _link["rel"] == self_link, job_links_body["links"]))[-1] self_link_body = copy.deepcopy(self_link_body) @@ -702,6 +720,9 @@ def links(self, container=None, self_link=None): self_link_body = {"href": job_url, "title": "Job status."} self_link_body["rel"] = "self" job_links_body["links"].append(self_link_body) + link_meta = {"type": CONTENT_TYPE_APP_JSON, "hreflang": ACCEPT_LANGUAGE_EN_CA} + for link in job_links_body["links"]: + link.update(link_meta) return job_links_body def json(self, container=None, self_link=None): # pylint: disable=W0221,arguments-differ @@ -782,7 +803,7 @@ def __init__(self, *args, **kwargs): @property def id(self): # type: () -> str - return self["id"] + return dict.__getitem__(self, "id") @property def identifier(self): @@ -804,10 +825,20 @@ def abstract(self): # type: () -> str return self.get("abstract", "") + @property + def description(self): + # OGC-API-Processes v1 field representation + # bw-compat with existing processes that defined it as abstract + return self.abstract or self.get("description", "") + @property def keywords(self): # type: () -> List[str] - return self.get("keywords", []) + keywords = self.setdefault("keywords", []) + if self.type not in keywords: + keywords.append(self.type) + self["keywords"] = keywords + return dict.__getitem__(self, "keywords") @property def metadata(self): @@ -848,18 +879,26 @@ def outputs(self): @property def jobControlOptions(self): # noqa: N802 # type: () -> List[str] - self.setdefault("jobControlOptions", [EXECUTE_CONTROL_OPTION_ASYNC]) - if not isinstance(self["jobControlOptions"], list): # eg: None, bw-compat - self["jobControlOptions"] = [EXECUTE_CONTROL_OPTION_ASYNC] - self["jobControlOptions"] = [mode for mode in self["jobControlOptions"] if mode in EXECUTE_CONTROL_OPTIONS] - if len(self["jobControlOptions"]) == 0: - self["jobControlOptions"].append(EXECUTE_CONTROL_OPTION_ASYNC) - return self.get("jobControlOptions") + jco = self.setdefault("jobControlOptions", [EXECUTE_CONTROL_OPTION_ASYNC]) + if not isinstance(jco, list): # eg: None, bw-compat + jco = [EXECUTE_CONTROL_OPTION_ASYNC] + jco = [mode for mode in jco if mode in EXECUTE_CONTROL_OPTIONS] + if len(jco) == 0: + jco.append(EXECUTE_CONTROL_OPTION_ASYNC) + self["jobControlOptions"] = jco + return dict.__getitem__(self, "jobControlOptions") @property def outputTransmission(self): # noqa: N802 # type: () -> List[str] - return self.get("outputTransmission", []) + out = self.setdefault("outputTransmission", [EXECUTE_TRANSMISSION_MODE_REFERENCE]) + if not isinstance(out, list): # eg: None, bw-compat + out = [EXECUTE_TRANSMISSION_MODE_REFERENCE] + out = [mode for mode in out if mode in EXECUTE_TRANSMISSION_MODE_OPTIONS] + if len(out) == 0: + out.append(EXECUTE_TRANSMISSION_MODE_REFERENCE) + self["outputTransmission"] = out + return dict.__getitem__(self, "outputTransmission") @property def processDescriptionURL(self): # noqa: N802 @@ -1018,28 +1057,50 @@ def json(self): """ Obtains the JSON serializable complete representation of the process. """ - return sd.Process().deserialize(self) + return sd.Process().deserialize(self.dict()) + + def links(self, container=None): + # type: (Optional[AnySettingsContainer]) -> JSON + """Obtains the JSON links section of many response body for the process. + + :param container: object that helps retrieve instance details, namely the host URL. + """ + settings = get_settings(container) + base_url = get_wps_restapi_base_url(settings) + proc_desc = sd.process_service.path.format(process_id=self.id) + proc_list = sd.processes_service.path + proc_exec = sd.process_execution_service.path.format(process_id=self.id) + links = [ + {"href": base_url + proc_desc, "rel": "self", "title": "Process description."}, + {"href": base_url + proc_desc, "rel": "process-desc", "title": "Process description."}, + {"href": base_url + proc_exec, "rel": "execute", "title": "Process execution endpoint for job submission."}, + {"href": base_url + proc_list, "rel": "collection", "title": "List of registered processes."} + ] + if self.processEndpointWPS1: + wps_url = "{}?service=WPS&request=GetCapabilities".format(self.processEndpointWPS1) + links.append({"href": wps_url, "rel": "service-desc", + "type": CONTENT_TYPE_APP_XML, "title": "Service definition."}) + for link in links: + link.setdefault("type", CONTENT_TYPE_APP_JSON) + link.setdefault("hreflang", ACCEPT_LANGUAGE_EN_CA) + return {"links": links} def offering(self): # type: () -> JSON """ Obtains the JSON serializable offering representation of the process. """ - process_offering = {"process": self} - if self.version: - process_offering.update({"processVersion": self.version}) - if self.jobControlOptions: - process_offering.update({"jobControlOptions": self.jobControlOptions}) - if self.outputTransmission: - process_offering.update({"outputTransmission": self.outputTransmission}) - return sd.ProcessOffering().deserialize(process_offering) + process = self.dict() + process.update(self.links()) + return sd.ProcessOffering().deserialize(process) def summary(self): # type: () -> JSON """ Obtains the JSON serializable summary representation of the process. """ - return sd.ProcessSummary().deserialize(self) + process = self.dict() + return sd.ProcessSummary().deserialize(process) @staticmethod def from_wps(wps_process, **extra_params): @@ -1171,7 +1232,7 @@ def __init__(self, *args, **kwargs): @property def id(self): """Quote ID.""" - return self["id"] + return dict.__getitem__(self, "id") @property def title(self): @@ -1191,12 +1252,12 @@ def details(self): @property def user(self): """User ID requesting the quote""" - return self["user"] + return dict.__getitem__(self, "user") @property def process(self): """WPS Process ID.""" - return self["process"] + return dict.__getitem__(self, "process") @property def estimatedTime(self): # noqa: N802 @@ -1302,22 +1363,22 @@ def __init__(self, *args, **kwargs): @property def id(self): """Bill ID.""" - return self["id"] + return dict.__getitem__(self, "id") @property def user(self): """User ID""" - return self["user"] + return dict.__getitem__(self, "user") @property def quote(self): """Quote ID.""" - return self["quote"] + return dict.__getitem__(self, "quote") @property def job(self): """Job ID.""" - return self["job"] + return dict.__getitem__(self, "job") @property def price(self): diff --git a/weaver/processes/utils.py b/weaver/processes/utils.py index b049e9969..368b3b98a 100644 --- a/weaver/processes/utils.py +++ b/weaver/processes/utils.py @@ -222,6 +222,9 @@ def deploy_process_from_payload(payload, container, overwrite=False): process_info["owsContext"] = {"offering": {"content": {"href": str(reference)}}} elif isinstance(ows_context, dict): process_info["owsContext"] = ows_context + # bw-compat abstract/description (see: ProcessDeployment schema) + if "description" not in process_info or not process_info["description"]: + process_info["description"] = process_info.get("abstract", "") # FIXME: handle colander invalid directly in tween (https://github.com/crim-ca/weaver/issues/112) try: diff --git a/weaver/typedefs.py b/weaver/typedefs.py index 4e5b57acd..da3933dc9 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -8,6 +8,7 @@ if TYPE_CHECKING: import os import typing + from datetime import datetime from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Type, Union if hasattr(typing, "TypedDict"): from typing import TypedDict # pylint: disable=E0611,no-name-in-module @@ -117,5 +118,4 @@ # others DatetimeIntervalType = TypedDict("DatetimeIntervalType", - {"before": str, "after": str, - "match": str, }, total=False) + {"before": datetime, "after": datetime, "match": datetime}, total=False) diff --git a/weaver/wps_restapi/processes/processes.py b/weaver/wps_restapi/processes/processes.py index 4a73a993f..4d31cd85b 100644 --- a/weaver/wps_restapi/processes/processes.py +++ b/weaver/wps_restapi/processes/processes.py @@ -163,7 +163,7 @@ def get_processes(request): for i, provider in enumerate(services): processes = list_remote_processes(provider, request) response_body["providers"][i].update({ - "processes": processes if detail else [get_any_id(proc) for proc in processes.json()] + "processes": processes if detail else [get_any_id(proc) for proc in processes] }) return HTTPOk(json=response_body) # FIXME: handle colander invalid directly in tween (https://github.com/crim-ca/weaver/issues/112) diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index 86974c9f5..cd3efec7f 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -558,9 +558,9 @@ class OWSContext(ExtendedMappingSchema): class DescriptionBase(ExtendedMappingSchema): - title = ExtendedSchemaNode(String(), missing=drop, description="Short name definition of the process.") - abstract = ExtendedSchemaNode(String(), missing=drop, description="Detailed explanation of the process operation.") - links = LinkList(missing=drop, description="References to endpoints with information related to the process.") + title = ExtendedSchemaNode(String(), missing=drop, description="Short human-readable name of the object.") + description = ExtendedSchemaNode(String(), missing=drop, description="Detailed explanation of the object.") + links = LinkList(missing=drop, description="References to endpoints with information related to object.") class DescriptionOWS(ExtendedMappingSchema): @@ -1869,21 +1869,17 @@ class OutputList(ExtendedSequenceSchema): output = Output() -class ProviderSummarySchema(ExtendedMappingSchema): +class ProviderSummarySchema(DescriptionType): """WPS provider summary definition.""" id = ExtendedSchemaNode(String()) url = URL(description="Endpoint of the provider.") - title = ExtendedSchemaNode(String()) - abstract = ExtendedSchemaNode(String()) public = ExtendedSchemaNode(Boolean()) -class ProviderCapabilitiesSchema(ExtendedMappingSchema): +class ProviderCapabilitiesSchema(DescriptionType): """WPS provider capabilities.""" id = ExtendedSchemaNode(String()) url = URL(description="WPS GetCapabilities URL of the provider.") - title = ExtendedSchemaNode(String()) - abstract = ExtendedSchemaNode(String()) contact = ExtendedSchemaNode(String()) type = ExtendedSchemaNode(String()) @@ -1922,13 +1918,19 @@ class ProcessInfo(ExtendedMappingSchema): executeEndpoint = URL(description="Endpoint where the process can be executed from.", missing=drop) -class Process(ProcessInfo, ProcessDescriptionType, ProcessDescriptionMeta): +class Process(ProcessSummary, ProcessInfo): inputs = InputTypeList(description="Inputs definition of the process.") outputs = OutputDescriptionList(description="Outputs definition of the process.") visibility = VisibilityValue(missing=drop) class ProcessDeployment(ProcessDescriptionType, ProcessDeployMeta): + # explicit "abstract" handling for bw-compat, new versions should use "description" + # only allowed in deploy to support older servers that report abstract (or parsed from WPS-1/2) + # recent OGC-API v1+ will usually provide directly "description" as per the specification + abstract = ExtendedSchemaNode(String(), missing=drop, deprecated=True, + description="Detailed explanation of the process being deployed. " + "[Deprecated] Consider using 'description' instead.") # allowed undefined I/O during deploy because of reference from owsContext or executionUnit inputs = InputTypeList( missing=drop, title="DeploymentInputs", @@ -1945,11 +1947,11 @@ class ProcessDeployment(ProcessDescriptionType, ProcessDeployMeta): class ProcessOutputDescriptionSchema(ExtendedMappingSchema): """WPS process output definition.""" - dataType = ExtendedSchemaNode(String()) - defaultValue = ExtendedMappingSchema() id = ExtendedSchemaNode(String()) - abstract = ExtendedSchemaNode(String()) title = ExtendedSchemaNode(String()) + abstract = ExtendedSchemaNode(String()) + dataType = ExtendedSchemaNode(String()) + defaultValue = ExtendedMappingSchema() class JobStatusInfo(ExtendedMappingSchema): @@ -2629,8 +2631,14 @@ class DeploymentResult(ExtendedMappingSchema): processSummary = ProcessSummary() -class ProcessDescriptionBodySchema(ExtendedMappingSchema): - process = ProcessDescriptionSchema() +# CHANGE: +# 'process' contents is now directly returned at the root of the 'ProcessDescription' response +# 'ProcessSummary' contents are also provided on top of items in 'ProcessDescription' body +# +# class ProcessDescriptionBodySchema(ExtendedMappingSchema): +# process = ProcessDescriptionSchema() +class ProcessDescriptionBodySchema(ProcessSummary, ProcessDescriptionSchema): + pass class ProvidersSchema(ExtendedSequenceSchema): @@ -2823,8 +2831,19 @@ class ProcessOfferingBase(ExtendedMappingSchema): outputTransmission = TransmissionModeList(missing=drop) -class ProcessOffering(ProcessOfferingBase): - process = Process() +class ProcessOffering(Process): + # CHANGE: process contents now directly at the root + # process = Process() + + def deserialize(self, cstruct): + """ + Define preferred key ordering. + """ + result = super(ProcessOffering, self).deserialize(cstruct) + key_order = ["id", "title", "version", "description", "keywords", "metadata", "inputs", "outputs", "links"] + offering = {field: result.pop(field, None) for field in key_order} + offering.update({field: value for field, value in result.items()}) + return offering class ProcessDeploymentOffering(ProcessOfferingBase): @@ -3570,7 +3589,9 @@ def service_api_route_info(service_api, settings): def datetime_interval_parser(datetime_interval): # type: (str) -> DatetimeIntervalType - """This function parses a given datetime or interval into a dictionary that will be easy for database process.""" + """ + This function parses a given datetime or interval into a dictionary that will be easy for database process. + """ parsed_datetime = {} if datetime_interval.startswith(DATETIME_INTERVAL_OPEN_START_SYMBOL): diff --git a/weaver/wps_restapi/utils.py b/weaver/wps_restapi/utils.py index a1956ff74..20b30cea6 100644 --- a/weaver/wps_restapi/utils.py +++ b/weaver/wps_restapi/utils.py @@ -24,4 +24,4 @@ def get_wps_restapi_base_url(container): weaver_url = get_weaver_url(settings) restapi_path = wps_restapi_base_path(settings) weaver_rest_url = weaver_url + restapi_path - return weaver_rest_url + return weaver_rest_url.rstrip("/").strip()