Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATAUP-765: add user and wsid to job output #3386

Merged
merged 1 commit into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 70 additions & 57 deletions src/biokbase/narrative/jobs/appmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import random
import re
import traceback
from typing import Callable, Optional
from collections.abc import Callable
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all changes in this file are adding typing

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the switch from using Optional?

Copy link
Collaborator Author

@ialarmedalien ialarmedalien Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ruff likes type | None rather than Optional[type]

from typing import Any

from biokbase import auth
from biokbase.narrative import clients
Expand All @@ -19,14 +20,12 @@
)
from biokbase.narrative.common import kblogging
from biokbase.narrative.exception_util import transform_job_exception
from biokbase.narrative.system import strict_system_variable, system_variable

from biokbase.narrative.widgetmanager import WidgetManager

from biokbase.narrative.jobs import specmanager
from biokbase.narrative.jobs.job import Job
from biokbase.narrative.jobs.jobcomm import MESSAGE_TYPE, JobComm
from biokbase.narrative.jobs.jobmanager import JobManager
from biokbase.narrative.system import strict_system_variable, system_variable
from biokbase.narrative.widgetmanager import WidgetManager

"""
A module for managing apps, specs, requirements, and for starting jobs.
Expand All @@ -45,7 +44,7 @@ def timestamp() -> str:
return datetime.datetime.utcnow().isoformat() + "Z"


def _app_error_wrapper(app_func: Callable) -> any:
def _app_error_wrapper(app_func: Callable) -> Callable:
"""
This is a decorator meant to wrap any of the `run_app*` methods here.
It captures any raised exception, formats it into a message that can be sent
Expand Down Expand Up @@ -122,15 +121,15 @@ def __new__(cls):
AppManager.__instance._comm = None
return AppManager.__instance

def reload(self):
def reload(self: "AppManager"):
"""
Reloads all app specs into memory from the App Catalog.
Any outputs of app_usage, app_description, or available_apps
should be run again after the update.
"""
self.spec_manager.reload()

def app_usage(self, app_id, tag="release"):
def app_usage(self: "AppManager", app_id: str, tag: str = "release"):
"""
This shows the list of inputs and outputs for a given app with a given
tag. By default, this is done in a pretty HTML way, but this app can be
Expand All @@ -149,7 +148,7 @@ def app_usage(self, app_id, tag="release"):
"""
return self.spec_manager.app_usage(app_id, tag)

def app_description(self, app_id, tag="release"):
def app_description(self: "AppManager", app_id: str, tag: str = "release"):
"""
Returns the app description in a printable HTML format.

Expand All @@ -166,7 +165,7 @@ def app_description(self, app_id, tag="release"):
"""
return self.spec_manager.app_description(app_id, tag)

def available_apps(self, tag="release"):
def available_apps(self: "AppManager", tag: str = "release"):
"""
Lists the set of available apps for a given tag in a simple table.
If the tag is not found, a ValueError will be raised.
Expand All @@ -181,13 +180,13 @@ def available_apps(self, tag="release"):

@_app_error_wrapper
def run_legacy_batch_app(
self,
app_id,
self: "AppManager",
app_id: str,
params,
tag="release",
version=None,
cell_id=None,
run_id=None,
tag: str = "release",
version: str | None = None,
cell_id: str | None = None,
run_id: str | None = None,
dry_run=False,
):
if params is None:
Expand Down Expand Up @@ -320,15 +319,15 @@ def run_legacy_batch_app(

@_app_error_wrapper
def run_app(
self,
app_id,
self: "AppManager",
app_id: str,
params,
tag="release",
version=None,
cell_id=None,
run_id=None,
tag: str = "release",
version: str | None = None,
cell_id: str | None = None,
run_id: str | None = None,
dry_run=False,
):
) -> dict[str, Any]:
"""
Attempts to run the app, returns a Job with the running app info.
If this is given a cell_id, then returns None. If not, it returns the
Expand Down Expand Up @@ -409,12 +408,12 @@ def run_app(

@_app_error_wrapper
def run_app_batch(
self,
self: "AppManager",
app_info: list,
cell_id: str = None,
run_id: str = None,
cell_id: str | None = None,
run_id: str | None = None,
dry_run: bool = False,
) -> Optional[dict]:
) -> None | dict[str, Job | list[Job]] | dict[str, dict[str, str | int] | list]:
"""
Attempts to run a batch of apps in bulk using the Execution Engine's run_app_batch endpoint.
If a cell_id is provided, this sends various job messages over the comm channel, and returns None.
Expand Down Expand Up @@ -554,7 +553,7 @@ def run_app_batch(
},
)

child_jobs = Job.from_job_ids(child_ids, return_list=True)
child_jobs = Job.from_job_ids(child_ids)
parent_job = Job.from_job_id(
batch_id,
children=child_jobs,
Expand All @@ -568,7 +567,7 @@ def run_app_batch(
if cell_id is None:
return {"parent_job": parent_job, "child_jobs": child_jobs}

def _validate_bulk_app_info(self, app_info: dict):
def _validate_bulk_app_info(self: "AppManager", app_info: dict):
"""
Validation consists of:
1. must have "app_id" with format xyz/abc
Expand Down Expand Up @@ -610,7 +609,9 @@ def _validate_bulk_app_info(self, app_info: dict):
f"an app version must be a string, not {app_info['version']}"
)

def _reconstitute_shared_params(self, app_info_el: dict) -> None:
def _reconstitute_shared_params(
self: "AppManager", app_info_el: dict[str, Any]
) -> None:
"""
Mutate each params dict to include any shared_params
app_info_el is structured like:
Expand Down Expand Up @@ -639,14 +640,14 @@ def _reconstitute_shared_params(self, app_info_el: dict) -> None:
param_set.setdefault(k, v)

def _build_run_job_params(
self,
spec: dict,
self: "AppManager",
spec: dict[str, Any],
tag: str,
param_set: dict,
version: Optional[str] = None,
cell_id: Optional[str] = None,
run_id: Optional[str] = None,
ws_id: Optional[int] = None,
param_set: dict[str, Any],
version: str | None = None,
cell_id: str | None = None,
run_id: str | None = None,
ws_id: int | None = None,
) -> dict:
"""
Builds the set of inputs for EE2.run_job and EE2.run_job_batch (RunJobParams) given a spec
Expand Down Expand Up @@ -726,13 +727,13 @@ def _build_run_job_params(

@_app_error_wrapper
def run_local_app(
self,
app_id,
params,
tag="release",
version=None,
cell_id=None,
run_id=None,
self: "AppManager",
app_id: str,
params: dict[str, Any],
tag: str = "release",
version: str | None = None,
cell_id: str | None = None,
run_id: str | None = None,
widget_state=None,
):
"""
Expand Down Expand Up @@ -811,14 +812,14 @@ def run_local_app(
)

def run_local_app_advanced(
self,
app_id,
params,
self: "AppManager",
app_id: str,
params: dict[str, Any],
widget_state,
tag="release",
version=None,
cell_id=None,
run_id=None,
tag: str = "release",
version: str | None = None,
cell_id: str | None = None,
run_id: str | None = None,
):
return self.run_local_app(
app_id,
Expand All @@ -830,7 +831,9 @@ def run_local_app_advanced(
run_id=run_id,
)

def _get_validated_app_spec(self, app_id, tag, is_long, version=None):
def _get_validated_app_spec(
self: "AppManager", app_id: str, tag: str, is_long, version: str | None = None
):
if (
version is not None
and tag != "release"
Expand Down Expand Up @@ -860,7 +863,12 @@ def _get_validated_app_spec(self, app_id, tag, is_long, version=None):
)
return spec

def _map_group_inputs(self, value, spec_param, spec_params):
def _map_group_inputs(
self: "AppManager",
value: list | dict | None,
spec_param: dict[str, Any],
spec_params: dict[str, Any],
):
if isinstance(value, list):
return [self._map_group_inputs(v, spec_param, spec_params) for v in value]

Expand Down Expand Up @@ -891,7 +899,12 @@ def _map_group_inputs(self, value, spec_param, spec_params):
mapped_value[target_key] = target_val
return mapped_value

def _map_inputs(self, input_mapping, params, spec_params):
def _map_inputs(
self: "AppManager",
input_mapping: list[dict[str, Any]],
params: dict[str, Any],
spec_params: dict[str, Any],
):
"""
Maps the dictionary of parameters and inputs based on rules provided in
the input_mapping. This iterates over the list of input_mappings, and
Expand Down Expand Up @@ -977,7 +990,7 @@ def _map_inputs(self, input_mapping, params, spec_params):
inputs_list.append(inputs_dict[k])
return inputs_list

def _generate_input(self, generator):
def _generate_input(self: "AppManager", generator: dict[str, Any] | None):
"""
Generates an input value using rules given by
NarrativeMethodStore.AutoGeneratedValue.
Expand Down Expand Up @@ -1007,10 +1020,10 @@ def _generate_input(self, generator):
return ret + str(generator["suffix"])
return ret

def _send_comm_message(self, msg_type, content):
def _send_comm_message(self: "AppManager", msg_type: str, content: dict[str, Any]):
JobComm().send_comm_message(msg_type, content)

def _get_agent_token(self, name: str) -> auth.TokenInfo:
def _get_agent_token(self: "AppManager", name: str) -> auth.TokenInfo:
"""
Retrieves an agent token from the Auth service with a formatted name.
This prepends "KBApp_" to the name for filtering, and trims to make sure the name
Expand Down
Loading
Loading