Skip to content

Commit

Permalink
Merge pull request #3386 from kbase/DATAUP-765_add_wsid_to_job
Browse files Browse the repository at this point in the history
DATAUP-765: add user and wsid to job output
  • Loading branch information
ialarmedalien committed Nov 16, 2023
2 parents a37bd72 + b982e48 commit 16b4c20
Show file tree
Hide file tree
Showing 14 changed files with 491 additions and 445 deletions.
127 changes: 70 additions & 57 deletions src/biokbase/narrative/jobs/appmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import random
import re
import traceback
from typing import Callable, Optional
from collections.abc import Callable
from typing import Any

from biokbase import auth
from biokbase.narrative import clients
Expand All @@ -19,14 +20,12 @@
)
from biokbase.narrative.common import kblogging
from biokbase.narrative.exception_util import transform_job_exception
from biokbase.narrative.system import strict_system_variable, system_variable

from biokbase.narrative.widgetmanager import WidgetManager

from biokbase.narrative.jobs import specmanager
from biokbase.narrative.jobs.job import Job
from biokbase.narrative.jobs.jobcomm import MESSAGE_TYPE, JobComm
from biokbase.narrative.jobs.jobmanager import JobManager
from biokbase.narrative.system import strict_system_variable, system_variable
from biokbase.narrative.widgetmanager import WidgetManager

"""
A module for managing apps, specs, requirements, and for starting jobs.
Expand All @@ -45,7 +44,7 @@ def timestamp() -> str:
return datetime.datetime.utcnow().isoformat() + "Z"


def _app_error_wrapper(app_func: Callable) -> any:
def _app_error_wrapper(app_func: Callable) -> Callable:
"""
This is a decorator meant to wrap any of the `run_app*` methods here.
It captures any raised exception, formats it into a message that can be sent
Expand Down Expand Up @@ -122,15 +121,15 @@ def __new__(cls):
AppManager.__instance._comm = None
return AppManager.__instance

def reload(self):
def reload(self: "AppManager"):
"""
Reloads all app specs into memory from the App Catalog.
Any outputs of app_usage, app_description, or available_apps
should be run again after the update.
"""
self.spec_manager.reload()

def app_usage(self, app_id, tag="release"):
def app_usage(self: "AppManager", app_id: str, tag: str = "release"):
"""
This shows the list of inputs and outputs for a given app with a given
tag. By default, this is done in a pretty HTML way, but this app can be
Expand All @@ -149,7 +148,7 @@ def app_usage(self, app_id, tag="release"):
"""
return self.spec_manager.app_usage(app_id, tag)

def app_description(self, app_id, tag="release"):
def app_description(self: "AppManager", app_id: str, tag: str = "release"):
"""
Returns the app description in a printable HTML format.
Expand All @@ -166,7 +165,7 @@ def app_description(self, app_id, tag="release"):
"""
return self.spec_manager.app_description(app_id, tag)

def available_apps(self, tag="release"):
def available_apps(self: "AppManager", tag: str = "release"):
"""
Lists the set of available apps for a given tag in a simple table.
If the tag is not found, a ValueError will be raised.
Expand All @@ -181,13 +180,13 @@ def available_apps(self, tag="release"):

@_app_error_wrapper
def run_legacy_batch_app(
self,
app_id,
self: "AppManager",
app_id: str,
params,
tag="release",
version=None,
cell_id=None,
run_id=None,
tag: str = "release",
version: str | None = None,
cell_id: str | None = None,
run_id: str | None = None,
dry_run=False,
):
if params is None:
Expand Down Expand Up @@ -320,15 +319,15 @@ def run_legacy_batch_app(

@_app_error_wrapper
def run_app(
self,
app_id,
self: "AppManager",
app_id: str,
params,
tag="release",
version=None,
cell_id=None,
run_id=None,
tag: str = "release",
version: str | None = None,
cell_id: str | None = None,
run_id: str | None = None,
dry_run=False,
):
) -> dict[str, Any]:
"""
Attempts to run the app, returns a Job with the running app info.
If this is given a cell_id, then returns None. If not, it returns the
Expand Down Expand Up @@ -409,12 +408,12 @@ def run_app(

@_app_error_wrapper
def run_app_batch(
self,
self: "AppManager",
app_info: list,
cell_id: str = None,
run_id: str = None,
cell_id: str | None = None,
run_id: str | None = None,
dry_run: bool = False,
) -> Optional[dict]:
) -> None | dict[str, Job | list[Job]] | dict[str, dict[str, str | int] | list]:
"""
Attempts to run a batch of apps in bulk using the Execution Engine's run_app_batch endpoint.
If a cell_id is provided, this sends various job messages over the comm channel, and returns None.
Expand Down Expand Up @@ -554,7 +553,7 @@ def run_app_batch(
},
)

child_jobs = Job.from_job_ids(child_ids, return_list=True)
child_jobs = Job.from_job_ids(child_ids)
parent_job = Job.from_job_id(
batch_id,
children=child_jobs,
Expand All @@ -568,7 +567,7 @@ def run_app_batch(
if cell_id is None:
return {"parent_job": parent_job, "child_jobs": child_jobs}

def _validate_bulk_app_info(self, app_info: dict):
def _validate_bulk_app_info(self: "AppManager", app_info: dict):
"""
Validation consists of:
1. must have "app_id" with format xyz/abc
Expand Down Expand Up @@ -610,7 +609,9 @@ def _validate_bulk_app_info(self, app_info: dict):
f"an app version must be a string, not {app_info['version']}"
)

def _reconstitute_shared_params(self, app_info_el: dict) -> None:
def _reconstitute_shared_params(
self: "AppManager", app_info_el: dict[str, Any]
) -> None:
"""
Mutate each params dict to include any shared_params
app_info_el is structured like:
Expand Down Expand Up @@ -639,14 +640,14 @@ def _reconstitute_shared_params(self, app_info_el: dict) -> None:
param_set.setdefault(k, v)

def _build_run_job_params(
self,
spec: dict,
self: "AppManager",
spec: dict[str, Any],
tag: str,
param_set: dict,
version: Optional[str] = None,
cell_id: Optional[str] = None,
run_id: Optional[str] = None,
ws_id: Optional[int] = None,
param_set: dict[str, Any],
version: str | None = None,
cell_id: str | None = None,
run_id: str | None = None,
ws_id: int | None = None,
) -> dict:
"""
Builds the set of inputs for EE2.run_job and EE2.run_job_batch (RunJobParams) given a spec
Expand Down Expand Up @@ -726,13 +727,13 @@ def _build_run_job_params(

@_app_error_wrapper
def run_local_app(
self,
app_id,
params,
tag="release",
version=None,
cell_id=None,
run_id=None,
self: "AppManager",
app_id: str,
params: dict[str, Any],
tag: str = "release",
version: str | None = None,
cell_id: str | None = None,
run_id: str | None = None,
widget_state=None,
):
"""
Expand Down Expand Up @@ -811,14 +812,14 @@ def run_local_app(
)

def run_local_app_advanced(
self,
app_id,
params,
self: "AppManager",
app_id: str,
params: dict[str, Any],
widget_state,
tag="release",
version=None,
cell_id=None,
run_id=None,
tag: str = "release",
version: str | None = None,
cell_id: str | None = None,
run_id: str | None = None,
):
return self.run_local_app(
app_id,
Expand All @@ -830,7 +831,9 @@ def run_local_app_advanced(
run_id=run_id,
)

def _get_validated_app_spec(self, app_id, tag, is_long, version=None):
def _get_validated_app_spec(
self: "AppManager", app_id: str, tag: str, is_long, version: str | None = None
):
if (
version is not None
and tag != "release"
Expand Down Expand Up @@ -860,7 +863,12 @@ def _get_validated_app_spec(self, app_id, tag, is_long, version=None):
)
return spec

def _map_group_inputs(self, value, spec_param, spec_params):
def _map_group_inputs(
self: "AppManager",
value: list | dict | None,
spec_param: dict[str, Any],
spec_params: dict[str, Any],
):
if isinstance(value, list):
return [self._map_group_inputs(v, spec_param, spec_params) for v in value]

Expand Down Expand Up @@ -891,7 +899,12 @@ def _map_group_inputs(self, value, spec_param, spec_params):
mapped_value[target_key] = target_val
return mapped_value

def _map_inputs(self, input_mapping, params, spec_params):
def _map_inputs(
self: "AppManager",
input_mapping: list[dict[str, Any]],
params: dict[str, Any],
spec_params: dict[str, Any],
):
"""
Maps the dictionary of parameters and inputs based on rules provided in
the input_mapping. This iterates over the list of input_mappings, and
Expand Down Expand Up @@ -977,7 +990,7 @@ def _map_inputs(self, input_mapping, params, spec_params):
inputs_list.append(inputs_dict[k])
return inputs_list

def _generate_input(self, generator):
def _generate_input(self: "AppManager", generator: dict[str, Any] | None):
"""
Generates an input value using rules given by
NarrativeMethodStore.AutoGeneratedValue.
Expand Down Expand Up @@ -1007,10 +1020,10 @@ def _generate_input(self, generator):
return ret + str(generator["suffix"])
return ret

def _send_comm_message(self, msg_type, content):
def _send_comm_message(self: "AppManager", msg_type: str, content: dict[str, Any]):
JobComm().send_comm_message(msg_type, content)

def _get_agent_token(self, name: str) -> auth.TokenInfo:
def _get_agent_token(self: "AppManager", name: str) -> auth.TokenInfo:
"""
Retrieves an agent token from the Auth service with a formatted name.
This prepends "KBApp_" to the name for filtering, and trims to make sure the name
Expand Down
Loading

0 comments on commit 16b4c20

Please sign in to comment.