Skip to content

Commit

Permalink
fix(set_workflow_status): validate endpoint arguments (reanahub#589)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdonadoni committed Jul 3, 2024
1 parent 784efee commit 5945d7f
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 45 deletions.
19 changes: 15 additions & 4 deletions docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -1116,19 +1116,30 @@
"type": "string"
},
{
"description": "Optional. Additional input parameters and operational options for workflow execution. Possible parameters are `CACHE=on/off`, passed to disable caching of results in serial workflows, `all_runs=True/False` deletes all runs of a given workflow if status is set to deleted and `workspace=True/False` which deletes the workspace of a workflow.",
"description": "Optional. Additional parameters to customise the workflow status change.",
"in": "body",
"name": "parameters",
"required": false,
"schema": {
"properties": {
"CACHE": {
"type": "string"
},
"all_runs": {
"description": "Optional. If true, delete all runs of the workflow. Only allowed when status is `deleted`.",
"type": "boolean"
},
"input_parameters": {
"description": "Optional. Additional input parameters that override the ones from the workflow specification. Only allowed when status is `start`.",
"type": "object"
},
"operational_options": {
"description": "Optional. Additional operational options for workflow execution. Only allowed when status is `start`.",
"type": "object"
},
"restart": {
"description": "Optional. If true, the workflow is a restart of an earlier workflow execution. Only allowed when status is `start`.",
"type": "boolean"
},
"workspace": {
"description": "Optional, but must be set to true if provided. If true, delete also the workspace of the workflow. Only allowed when status is `deleted`.",
"type": "boolean"
}
},
Expand Down
23 changes: 11 additions & 12 deletions reana_workflow_controller/rest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ def start_workflow(workflow, parameters):

def _start_workflow_db(workflow, parameters):
workflow.status = RunStatus.pending
if parameters:
workflow.input_parameters = parameters.get("input_parameters")
workflow.operational_options = parameters.get("operational_options")
workflow.input_parameters = parameters.get("input_parameters", {})
workflow.operational_options = parameters.get("operational_options", {})
current_db_sessions.add(workflow)
current_db_sessions.commit()

Expand All @@ -95,15 +94,15 @@ def _start_workflow_db(workflow, parameters):
verb=get_workflow_status_change_verb(workflow.status.name),
status=str(workflow.status.name),
)
if "restart" in parameters.keys():
if parameters["restart"]:
if workflow.status not in [
RunStatus.failed,
RunStatus.finished,
RunStatus.queued,
RunStatus.pending,
]:
raise REANAWorkflowControllerError(failure_message)

if parameters.get("restart"):
if workflow.status not in [
RunStatus.failed,
RunStatus.finished,
RunStatus.queued,
RunStatus.pending,
]:
raise REANAWorkflowControllerError(failure_message)
elif workflow.status not in [RunStatus.created, RunStatus.queued]:
if workflow.status == RunStatus.deleted:
raise REANAWorkflowStatusError(failure_message)
Expand Down
80 changes: 53 additions & 27 deletions reana_workflow_controller/rest/workflows_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import json

from flask import Blueprint, jsonify, request
from webargs import fields
from webargs.flaskparser import use_kwargs


from reana_commons.config import WORKFLOW_TIME_FORMAT
from reana_commons.errors import REANASecretDoesNotExist
Expand Down Expand Up @@ -314,7 +317,28 @@ def get_workflow_status(workflow_id_or_name): # noqa


@blueprint.route("/workflows/<workflow_id_or_name>/status", methods=["PUT"])
def set_workflow_status(workflow_id_or_name): # noqa
@use_kwargs(
{
# parameters for "start"
"input_parameters": fields.Dict(),
"operational_options": fields.Dict(),
"restart": fields.Boolean(),
# parameters for "deleted"
"all_runs": fields.Boolean(),
"workspace": fields.Boolean(),
},
location="json",
)
@use_kwargs(
{
"user": fields.Str(required=True),
"status": fields.Str(required=True),
},
location="query",
)
def set_workflow_status(
workflow_id_or_name: str, user: str, status: str, **parameters: dict
): # noqa
r"""Set workflow status.
---
Expand Down Expand Up @@ -348,21 +372,36 @@ def set_workflow_status(workflow_id_or_name): # noqa
- name: parameters
in: body
description: >-
Optional. Additional input parameters and operational options for
workflow execution. Possible parameters are `CACHE=on/off`, passed
to disable caching of results in serial workflows,
`all_runs=True/False` deletes all runs of a given workflow
if status is set to deleted and `workspace=True/False` which deletes
the workspace of a workflow.
Optional. Additional parameters to customise the workflow status change.
required: false
schema:
type: object
properties:
CACHE:
type: string
operational_options:
description: >-
Optional. Additional operational options for workflow execution.
Only allowed when status is `start`.
type: object
input_parameters:
description: >-
Optional. Additional input parameters that override the ones
from the workflow specification. Only allowed when status is `start`.
type: object
restart:
description: >-
Optional. If true, the workflow is a restart of an earlier workflow execution.
Only allowed when status is `start`.
type: boolean
all_runs:
description: >-
Optional. If true, delete all runs of the workflow.
Only allowed when status is `deleted`.
type: boolean
workspace:
description: >-
Optional, but must be set to true if provided.
If true, delete also the workspace of the workflow.
Only allowed when status is `deleted`.
type: boolean
responses:
200:
Expand Down Expand Up @@ -456,24 +495,11 @@ def set_workflow_status(workflow_id_or_name): # noqa
"""

try:
user_uuid = request.args["user"]
workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, user_uuid)
status = request.args.get("status")
workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, user)
if not (status in STATUSES):
return (
jsonify(
{
"message": "Status {0} is not one of: {1}".format(
status, ", ".join(STATUSES)
)
}
),
400,
)
error_msg = f"Status {status} is not one of: {', '.join(STATUSES)}"
return jsonify({"message": error_msg}), 400

parameters = {}
if request.is_json:
parameters = request.json
if status == START:
start_workflow(workflow, parameters)
return (
Expand All @@ -489,8 +515,8 @@ def set_workflow_status(workflow_id_or_name): # noqa
200,
)
elif status == DELETED:
all_runs = True if request.json.get("all_runs") else False
workspace = True if request.json.get("workspace", True) else False
all_runs = parameters.get("all_runs", False)
workspace = parameters.get("workspace", True)
if not workspace:
return (
jsonify(
Expand Down
49 changes: 47 additions & 2 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,9 +923,9 @@ def test_set_workflow_status_unknown_workflow(
url_for(
"statuses.set_workflow_status", workflow_id_or_name=random_workflow_uuid
),
query_string={"user": default_user.id_},
query_string={"user": default_user.id_, "status": payload},
content_type="application/json",
data=json.dumps(payload),
data=json.dumps({}),
)
assert res.status_code == 404

Expand Down Expand Up @@ -1151,6 +1151,51 @@ def test_start_input_parameters(
assert workflow.input_parameters == parameters["input_parameters"]


def test_start_no_input_parameters(
app,
session,
default_user,
user_secrets,
corev1_api_client_with_user_secrets,
sample_serial_workflow_in_db,
):
"""Test start workflow with inupt parameters."""
workflow = sample_serial_workflow_in_db
workflow_uuid = str(sample_serial_workflow_in_db.id_)

with app.test_client() as client:
# create workflow
workflow.status = RunStatus.created
session.add(workflow)
session.commit()

payload = START
parameters = {"operational_options": {}}
with mock.patch(
"reana_workflow_controller.workflow_run_manager."
"current_k8s_batchv1_api_client"
):
# provide user secret store
with mock.patch(
"reana_commons.k8s.secrets.current_k8s_corev1_api_client",
corev1_api_client_with_user_secrets(user_secrets),
):
# set workflow status to START and pass parameters
res = client.put(
url_for(
"statuses.set_workflow_status",
workflow_id_or_name=workflow_uuid,
),
query_string={"user": default_user.id_, "status": "start"},
content_type="application/json",
data=json.dumps(parameters),
)
json_response = json.loads(res.data.decode())
assert json_response["status"] == status_dict[payload].name
workflow = Workflow.query.filter(Workflow.id_ == workflow_uuid).first()
assert workflow.input_parameters == dict()


def test_start_workflow_db_failure(
app,
session,
Expand Down

0 comments on commit 5945d7f

Please sign in to comment.