Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(set_workflow_status): publish workflows to submission queue (#691) #691

Merged
merged 1 commit into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -3599,6 +3599,11 @@
},
{
"description": "Required. New workflow status.",
"enum": [
"start",
"stop",
"deleted"
],
"in": "query",
"name": "status",
"required": true,
Expand All @@ -3612,19 +3617,30 @@
"type": "string"
},
{
"description": "Optional. Additional input parameters and operational options.",
"description": "Optional. Additional parameters to customise the workflow status change.",
"in": "body",
"name": "parameters",
"required": false,
"schema": {
"properties": {
"CACHE": {
"type": "string"
},
"all_runs": {
"description": "Optional. If true, delete all runs of the workflow. Only allowed when status is `deleted`.",
"type": "boolean"
},
"input_parameters": {
"description": "Optional. Additional input parameters that override the ones from the workflow specification. Only allowed when status is `start`.",
"type": "object"
},
"operational_options": {
"description": "Optional. Additional operational options for workflow execution. Only allowed when status is `start`.",
"type": "object"
},
"restart": {
"description": "Optional. If true, the workflow is a restart of an earlier workflow execution. Only allowed when status is `start`.",
"type": "boolean"
},
"workspace": {
"description": "Optional, but must be set to true if provided. If true, delete also the workspace of the workflow. Only allowed when status is `deleted`.",
"type": "boolean"
}
},
Expand Down
194 changes: 125 additions & 69 deletions reana_server/rest/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,75 @@ def get_workflow_status(workflow_id_or_name, user): # noqa
return jsonify({"message": str(e)}), 500


def _start_workflow(workflow_id_or_name, user, **parameters):
"""Start given workflow by publishing it to the submission queue.

This function is used by both the `set_workflow_status` and `start_workflow`.
"""
operational_options = parameters.get("operational_options", {})
input_parameters = parameters.get("input_parameters", {})
restart = parameters.get("restart", False)
reana_specification = parameters.get("reana_specification")

try:
if not workflow_id_or_name:
raise ValueError("workflow_id_or_name is not supplied")

workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, str(user.id_))
operational_options = validate_operational_options(
workflow.type_, operational_options
)

restart_type = None
if restart:
if workflow.status not in [RunStatus.finished, RunStatus.failed]:
raise ValueError("Only finished or failed workflows can be restarted.")
if workflow.workspace_has_pending_retention_rules():
raise ValueError(
"The workflow cannot be restarted because some retention rules are "
"currently being applied to the workspace. Please retry later."
)
if reana_specification:
restart_type = reana_specification.get("workflow", {}).get("type", None)
workflow = clone_workflow(workflow, reana_specification, restart_type)
elif workflow.status != RunStatus.created:
raise ValueError(
"Workflow {} is already {} and cannot be started "
"again.".format(workflow.get_full_workflow_name(), workflow.status.name)
)
if "yadage" in (workflow.type_, restart_type):
_load_and_save_yadage_spec(workflow, operational_options)

validate_workflow(
workflow.reana_specification, input_parameters=input_parameters
)

# when starting the workflow, the scheduler will call RWC's `set_workflow_status`
# with the given `parameters`
publish_workflow_submission(workflow, user.id_, parameters)
response = {
"message": "Workflow submitted.",
"workflow_id": workflow.id_,
"workflow_name": workflow.name,
"status": RunStatus.queued.name,
"run_number": workflow.run_number,
"user": str(user.id_),
}
return response, 200
except HTTPError as e:
logging.error(traceback.format_exc())
return e.response.json(), e.response.status_code
except (REANAValidationError, ValidationError) as e:
logging.error(traceback.format_exc())
return {"message": str(e)}, 400
except ValueError as e:
logging.error(traceback.format_exc())
return {"message": str(e)}, 403
except Exception as e:
logging.error(traceback.format_exc())
return {"message": str(e)}, 500


@blueprint.route("/workflows/<workflow_id_or_name>/start", methods=["POST"])
@signin_required()
@use_kwargs(
Expand Down Expand Up @@ -1302,74 +1371,25 @@ def start_workflow(workflow_id_or_name, user, **parameters): # noqa
"message": "Status resume is not supported yet."
}
"""

operational_options = parameters.get("operational_options", {})
input_parameters = parameters.get("input_parameters", {})
restart = parameters.get("restart", False)
reana_specification = parameters.get("reana_specification")

try:
if not workflow_id_or_name:
raise ValueError("workflow_id_or_name is not supplied")

workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, str(user.id_))
operational_options = validate_operational_options(
workflow.type_, operational_options
)

restart_type = None
if restart:
if workflow.status not in [RunStatus.finished, RunStatus.failed]:
raise ValueError("Only finished or failed workflows can be restarted.")
if workflow.workspace_has_pending_retention_rules():
raise ValueError(
"The workflow cannot be restarted because some retention rules are "
"currently being applied to the workspace. Please retry later."
)
if reana_specification:
restart_type = reana_specification.get("workflow", {}).get("type", None)
workflow = clone_workflow(workflow, reana_specification, restart_type)
elif workflow.status != RunStatus.created:
raise ValueError(
"Workflow {} is already {} and cannot be started "
"again.".format(workflow.get_full_workflow_name(), workflow.status.name)
)
if "yadage" in (workflow.type_, restart_type):
_load_and_save_yadage_spec(workflow, operational_options)

validate_workflow(
workflow.reana_specification, input_parameters=input_parameters
)

# when starting the workflow, the scheduler will call RWC's `set_workflow_status`
# with the given `parameters`
publish_workflow_submission(workflow, user.id_, parameters)
response = {
"message": "Workflow submitted.",
"workflow_id": workflow.id_,
"workflow_name": workflow.name,
"status": RunStatus.queued.name,
"run_number": workflow.run_number,
"user": str(user.id_),
}
return jsonify(response), 200
except HTTPError as e:
logging.error(traceback.format_exc())
return jsonify(e.response.json()), e.response.status_code
except (REANAValidationError, ValidationError) as e:
logging.error(traceback.format_exc())
return jsonify({"message": str(e)}), 400
except ValueError as e:
logging.error(traceback.format_exc())
return jsonify({"message": str(e)}), 403
except Exception as e:
logging.error(traceback.format_exc())
return jsonify({"message": str(e)}), 500
response, status_code = _start_workflow(workflow_id_or_name, user, **parameters)
return jsonify(response), status_code


@blueprint.route("/workflows/<workflow_id_or_name>/status", methods=["PUT"])
@signin_required()
def set_workflow_status(workflow_id_or_name, user): # noqa
@use_kwargs(
{
"status": fields.Str(required=True, location="query"),
# parameters for "start"
"input_parameters": fields.Dict(location="json"),
"operational_options": fields.Dict(location="json"),
"restart": fields.Boolean(location="json"),
# parameters for "deleted"
"all_runs": fields.Boolean(location="json"),
"workspace": fields.Boolean(location="json"),
}
)
def set_workflow_status(workflow_id_or_name, user, status, **parameters): # noqa
r"""Set workflow status.
---
put:
Expand All @@ -1393,6 +1413,10 @@ def set_workflow_status(workflow_id_or_name, user): # noqa
description: Required. New workflow status.
required: true
type: string
enum:
- start
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this may look a bit inconsistent, it would be preferable to use either "start/stop/delete" (verbs) or "started/stopped/deleted" (nouns) everywhere ... but I guess this may not be for now.

(... for the API 1.0 stabilisation we could also clean mixing params/options for execution with all-runs/workspace for deletion...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- stop
- deleted
- name: access_token
in: query
description: The API access_token of workflow owner.
Expand All @@ -1401,18 +1425,37 @@ def set_workflow_status(workflow_id_or_name, user): # noqa
- name: parameters
in: body
description: >-
Optional. Additional input parameters and operational options.
Optional. Additional parameters to customise the workflow status change.
required: false
schema:
type: object
properties:
CACHE:
type: string
operational_options:
description: >-
Optional. Additional operational options for workflow execution.
Only allowed when status is `start`.
type: object
input_parameters:
description: >-
Optional. Additional input parameters that override the ones
from the workflow specification. Only allowed when status is `start`.
type: object
restart:
description: >-
Optional. If true, the workflow is a restart of an earlier workflow execution.
Only allowed when status is `start`.
type: boolean
all_runs:
description: >-
Optional. If true, delete all runs of the workflow.
Only allowed when status is `deleted`.
type: boolean
workspace:
description: >-
Optional, but must be set to true if provided.
If true, delete also the workspace of the workflow.
Only allowed when status is `deleted`.
type: boolean

responses:
200:
description: >-
Expand Down Expand Up @@ -1528,7 +1571,20 @@ def set_workflow_status(workflow_id_or_name, user): # noqa
try:
if not workflow_id_or_name:
raise ValueError("workflow_id_or_name is not supplied")
status = request.args.get("status")

if status == "start":
# We can't call directly RWC when starting a workflow, as otherwise
# the workflow would skip the queue. Instead, we do what the
# `start_workflow` endpoint does.
response, status_code = _start_workflow(
workflow_id_or_name, user, **parameters
)
if "run_number" in response:
# run_number is returned by `start_workflow`,
# but not by `set_status_workflow`
del response["run_number"]
return jsonify(response), status_code

parameters = request.json if request.is_json else None
response, http_response = current_rwc_api_client.api.set_workflow_status(
user=str(user.id_),
Expand Down
4 changes: 2 additions & 2 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def test_get_workflow_status(app, default_user, _get_user_mock):


def test_set_workflow_status(app, default_user, _get_user_mock):
"""Test get_workflow_logs view."""
"""Test set_workflow_status view."""
with app.test_client() as client:
with patch(
"reana_server.rest.workflows.current_rwc_api_client",
Expand All @@ -341,7 +341,7 @@ def test_set_workflow_status(app, default_user, _get_user_mock):
headers={"Content-Type": "application/json"},
query_string={"access_token": default_user.access_token},
)
assert res.status_code == 500
assert res.status_code == 422

res = client.put(
url_for("workflows.set_workflow_status", workflow_id_or_name="1"),
Expand Down
Loading