Skip to content

Commit

Permalink
fix(set_workflow_status): publish workflows to submission queue (#691)
Browse files Browse the repository at this point in the history
When starting a new workflow, publish the workflow to the submission
queue instead of executing the workflow immediately by calling
`set_workflow_status` in workflow-controller.

Closes #690
  • Loading branch information
mdonadoni committed Jul 2, 2024
1 parent 2199256 commit 6378451
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 75 deletions.
24 changes: 20 additions & 4 deletions docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -3599,6 +3599,11 @@
},
{
"description": "Required. New workflow status.",
"enum": [
"start",
"stop",
"deleted"
],
"in": "query",
"name": "status",
"required": true,
Expand All @@ -3612,19 +3617,30 @@
"type": "string"
},
{
"description": "Optional. Additional input parameters and operational options.",
"description": "Optional. Additional parameters to customise the change of workflow status.",
"in": "body",
"name": "parameters",
"required": false,
"schema": {
"properties": {
"CACHE": {
"type": "string"
},
"all_runs": {
"description": "Optional. If true, delete all runs of the workflow. Only allowed when status is `deleted`.",
"type": "boolean"
},
"input_parameters": {
"description": "Optional. Additional input parameters that override the ones in the workflow specification. Only allowed when status is `start`.",
"type": "object"
},
"operational_options": {
"description": "Optional. Operational options for workflow execution. Only allowed when status is `start`.",
"type": "object"
},
"restart": {
"description": "Optional. If true, the workflow is a restart of another one. Only allowed when status is `start`.",
"type": "boolean"
},
"workspace": {
"description": "Optional, but must be set to true if provided. If true, delete also the workspace of the workflow. Only allowed when status is `deleted`.",
"type": "boolean"
}
},
Expand Down
194 changes: 125 additions & 69 deletions reana_server/rest/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,75 @@ def get_workflow_status(workflow_id_or_name, user): # noqa
return jsonify({"message": str(e)}), 500


def _start_workflow(workflow_id_or_name, user, **parameters):
"""Start given workflow by publishing it to the submission queue.
This function is used by both the `set_workflow_status` and `start_workflow`.
"""
operational_options = parameters.get("operational_options", {})
input_parameters = parameters.get("input_parameters", {})
restart = parameters.get("restart", False)
reana_specification = parameters.get("reana_specification")

try:
if not workflow_id_or_name:
raise ValueError("workflow_id_or_name is not supplied")

Check warning on line 1146 in reana_server/rest/workflows.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/workflows.py#L1146

Added line #L1146 was not covered by tests

workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, str(user.id_))
operational_options = validate_operational_options(
workflow.type_, operational_options
)

restart_type = None
if restart:
if workflow.status not in [RunStatus.finished, RunStatus.failed]:
raise ValueError("Only finished or failed workflows can be restarted.")

Check warning on line 1156 in reana_server/rest/workflows.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/workflows.py#L1156

Added line #L1156 was not covered by tests
if workflow.workspace_has_pending_retention_rules():
raise ValueError(

Check warning on line 1158 in reana_server/rest/workflows.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/workflows.py#L1158

Added line #L1158 was not covered by tests
"The workflow cannot be restarted because some retention rules are "
"currently being applied to the workspace. Please retry later."
)
if reana_specification:
restart_type = reana_specification.get("workflow", {}).get("type", None)
workflow = clone_workflow(workflow, reana_specification, restart_type)
elif workflow.status != RunStatus.created:
raise ValueError(

Check warning on line 1166 in reana_server/rest/workflows.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/workflows.py#L1166

Added line #L1166 was not covered by tests
"Workflow {} is already {} and cannot be started "
"again.".format(workflow.get_full_workflow_name(), workflow.status.name)
)
if "yadage" in (workflow.type_, restart_type):
_load_and_save_yadage_spec(workflow, operational_options)

Check warning on line 1171 in reana_server/rest/workflows.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/workflows.py#L1171

Added line #L1171 was not covered by tests

validate_workflow(
workflow.reana_specification, input_parameters=input_parameters
)

# when starting the workflow, the scheduler will call RWC's `set_workflow_status`
# with the given `parameters`
publish_workflow_submission(workflow, user.id_, parameters)
response = {

Check warning on line 1180 in reana_server/rest/workflows.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/workflows.py#L1179-L1180

Added lines #L1179 - L1180 were not covered by tests
"message": "Workflow submitted.",
"workflow_id": workflow.id_,
"workflow_name": workflow.name,
"status": RunStatus.queued.name,
"run_number": workflow.run_number,
"user": str(user.id_),
}
return response, 200

Check warning on line 1188 in reana_server/rest/workflows.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/workflows.py#L1188

Added line #L1188 was not covered by tests
except HTTPError as e:
logging.error(traceback.format_exc())
return e.response.json(), e.response.status_code

Check warning on line 1191 in reana_server/rest/workflows.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/workflows.py#L1190-L1191

Added lines #L1190 - L1191 were not covered by tests
except (REANAValidationError, ValidationError) as e:
logging.error(traceback.format_exc())
return {"message": str(e)}, 400
except ValueError as e:
logging.error(traceback.format_exc())
return {"message": str(e)}, 403
except Exception as e:
logging.error(traceback.format_exc())
return {"message": str(e)}, 500

Check warning on line 1200 in reana_server/rest/workflows.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/workflows.py#L1195-L1200

Added lines #L1195 - L1200 were not covered by tests


@blueprint.route("/workflows/<workflow_id_or_name>/start", methods=["POST"])
@signin_required()
@use_kwargs(
Expand Down Expand Up @@ -1302,74 +1371,25 @@ def start_workflow(workflow_id_or_name, user, **parameters): # noqa
"message": "Status resume is not supported yet."
}
"""

operational_options = parameters.get("operational_options", {})
input_parameters = parameters.get("input_parameters", {})
restart = parameters.get("restart", False)
reana_specification = parameters.get("reana_specification")

try:
if not workflow_id_or_name:
raise ValueError("workflow_id_or_name is not supplied")

workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, str(user.id_))
operational_options = validate_operational_options(
workflow.type_, operational_options
)

restart_type = None
if restart:
if workflow.status not in [RunStatus.finished, RunStatus.failed]:
raise ValueError("Only finished or failed workflows can be restarted.")
if workflow.workspace_has_pending_retention_rules():
raise ValueError(
"The workflow cannot be restarted because some retention rules are "
"currently being applied to the workspace. Please retry later."
)
if reana_specification:
restart_type = reana_specification.get("workflow", {}).get("type", None)
workflow = clone_workflow(workflow, reana_specification, restart_type)
elif workflow.status != RunStatus.created:
raise ValueError(
"Workflow {} is already {} and cannot be started "
"again.".format(workflow.get_full_workflow_name(), workflow.status.name)
)
if "yadage" in (workflow.type_, restart_type):
_load_and_save_yadage_spec(workflow, operational_options)

validate_workflow(
workflow.reana_specification, input_parameters=input_parameters
)

# when starting the workflow, the scheduler will call RWC's `set_workflow_status`
# with the given `parameters`
publish_workflow_submission(workflow, user.id_, parameters)
response = {
"message": "Workflow submitted.",
"workflow_id": workflow.id_,
"workflow_name": workflow.name,
"status": RunStatus.queued.name,
"run_number": workflow.run_number,
"user": str(user.id_),
}
return jsonify(response), 200
except HTTPError as e:
logging.error(traceback.format_exc())
return jsonify(e.response.json()), e.response.status_code
except (REANAValidationError, ValidationError) as e:
logging.error(traceback.format_exc())
return jsonify({"message": str(e)}), 400
except ValueError as e:
logging.error(traceback.format_exc())
return jsonify({"message": str(e)}), 403
except Exception as e:
logging.error(traceback.format_exc())
return jsonify({"message": str(e)}), 500
response, status_code = _start_workflow(workflow_id_or_name, user, **parameters)
return jsonify(response), status_code


@blueprint.route("/workflows/<workflow_id_or_name>/status", methods=["PUT"])
@signin_required()
def set_workflow_status(workflow_id_or_name, user): # noqa
@use_kwargs(
{
"status": fields.Str(required=True, location="query"),
# parameters for "start"
"input_parameters": fields.Dict(location="json"),
"operational_options": fields.Dict(location="json"),
"restart": fields.Boolean(location="json"),
# parameters for "deleted"
"all_runs": fields.Boolean(location="json"),
"workspace": fields.Boolean(location="json"),
}
)
def set_workflow_status(workflow_id_or_name, user, status, **parameters): # noqa
r"""Set workflow status.
---
put:
Expand All @@ -1393,6 +1413,10 @@ def set_workflow_status(workflow_id_or_name, user): # noqa
description: Required. New workflow status.
required: true
type: string
enum:
- start
- stop
- deleted
- name: access_token
in: query
description: The API access_token of workflow owner.
Expand All @@ -1401,18 +1425,37 @@ def set_workflow_status(workflow_id_or_name, user): # noqa
- name: parameters
in: body
description: >-
Optional. Additional input parameters and operational options.
Optional. Additional parameters to customise the change of workflow status.
required: false
schema:
type: object
properties:
CACHE:
type: string
operational_options:
description: >-
Optional. Operational options for workflow execution.
Only allowed when status is `start`.
type: object
input_parameters:
description: >-
Optional. Additional input parameters that override the ones
in the workflow specification. Only allowed when status is `start`.
type: object
restart:
description: >-
Optional. If true, the workflow is a restart of another one.
Only allowed when status is `start`.
type: boolean
all_runs:
description: >-
Optional. If true, delete all runs of the workflow.
Only allowed when status is `deleted`.
type: boolean
workspace:
description: >-
Optional, but must be set to true if provided.
If true, delete also the workspace of the workflow.
Only allowed when status is `deleted`.
type: boolean
responses:
200:
description: >-
Expand Down Expand Up @@ -1528,7 +1571,20 @@ def set_workflow_status(workflow_id_or_name, user): # noqa
try:
if not workflow_id_or_name:
raise ValueError("workflow_id_or_name is not supplied")
status = request.args.get("status")

if status == "start":
# We can't call directly RWC when starting a workflow, as otherwise
# the workflow would skip the queue. Instead, we do what the
# `start_workflow` endpoint does.
response, status_code = _start_workflow(

Check warning on line 1579 in reana_server/rest/workflows.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/workflows.py#L1579

Added line #L1579 was not covered by tests
workflow_id_or_name, user, **parameters
)
if "run_number" in response:

Check warning on line 1582 in reana_server/rest/workflows.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/workflows.py#L1582

Added line #L1582 was not covered by tests
# run_number is returned by `start_workflow`,
# but not by `set_status_workflow`
del response["run_number"]
return jsonify(response), status_code

Check warning on line 1586 in reana_server/rest/workflows.py

View check run for this annotation

Codecov / codecov/patch

reana_server/rest/workflows.py#L1585-L1586

Added lines #L1585 - L1586 were not covered by tests

parameters = request.json if request.is_json else None
response, http_response = current_rwc_api_client.api.set_workflow_status(
user=str(user.id_),
Expand Down
4 changes: 2 additions & 2 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def test_get_workflow_status(app, default_user, _get_user_mock):


def test_set_workflow_status(app, default_user, _get_user_mock):
"""Test get_workflow_logs view."""
"""Test set_workflow_status view."""
with app.test_client() as client:
with patch(
"reana_server.rest.workflows.current_rwc_api_client",
Expand All @@ -341,7 +341,7 @@ def test_set_workflow_status(app, default_user, _get_user_mock):
headers={"Content-Type": "application/json"},
query_string={"access_token": default_user.access_token},
)
assert res.status_code == 500
assert res.status_code == 422

res = client.put(
url_for("workflows.set_workflow_status", workflow_id_or_name="1"),
Expand Down

0 comments on commit 6378451

Please sign in to comment.