Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOW 2 #1504

Merged
merged 30 commits into from
Feb 26, 2019
Merged

SOW 2 #1504

Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
debb65e
Add feature flag use_runner_daemon
njsmith Jan 24, 2019
a9ec446
First pass at runner daemon and Unix-socket based API
njsmith Jan 24, 2019
116e0c2
First sketch of new api_runCancel
njsmith Jan 24, 2019
61e2e9b
WIP: respond to review comments + first pass at process handling
njsmith Feb 8, 2019
b626fc2
quotes
njsmith Feb 8, 2019
0c908c6
Add a simple way to run the runner and dev server together
njsmith Feb 8, 2019
d41db40
make the output fancier than it needs to be
njsmith Feb 8, 2019
645d105
work around pyenv quirk
njsmith Feb 8, 2019
70dbcd2
Run & cancel working end-to-end with new runner daemon
njsmith Feb 17, 2019
273db7b
Update for trio v0.11.0
njsmith Feb 18, 2019
541bd5a
Add trio to install_requires *if* installing on py3
njsmith Feb 18, 2019
72ead85
single quotes
njsmith Feb 18, 2019
0995cd6
more colors
njsmith Feb 18, 2019
62c302d
small cleanups for dev supervisor
njsmith Feb 18, 2019
5ba130b
Update flask_client to use new SIREPO_SRDB_ROOT configury
njsmith Feb 18, 2019
2c0850d
Add end-to-end test of runner daemon
njsmith Feb 18, 2019
5237cbc
Clean up debugging
njsmith Feb 18, 2019
cdc48d0
Fix 'completed' status string to match what we've used historically
njsmith Feb 19, 2019
d66a632
remove some unnecessary pytest.importorskip calls
njsmith Feb 20, 2019
bc17b67
skip runner test if py3 environment isn't set up
njsmith Feb 20, 2019
d555184
remove pkdp
njsmith Feb 26, 2019
4b535a1
if the job's already dead, then no need to kick it while it's down
njsmith Feb 26, 2019
9b50e1a
don't try to send fancy error messages over the wire
njsmith Feb 26, 2019
d65ce11
log if we see a stale 'pending' status
njsmith Feb 26, 2019
dab6884
index by correct variable
njsmith Feb 26, 2019
143b328
log if two jobs are trying to run in the same dir at the same time
njsmith Feb 26, 2019
66e0404
update comment on race conditions
njsmith Feb 26, 2019
75b3f17
rephrase comment
njsmith Feb 26, 2019
27e8f2d
use /dev/null for job stdin
njsmith Feb 26, 2019
5ebd9dc
log when job fails
njsmith Feb 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions sirepo/pkcli/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def _catch_and_log_errors(exc_type, msg, *args, **kwargs):
try:
yield
except trio.MultiError as multi_exc:
raise AssertionError("handle MultiErrors in _catch_and_log_errors")
raise AssertionError('handle MultiErrors in _catch_and_log_errors')
except exc_type:
pkdlog(msg, *args, **kwargs)
pkdlog(pkdexc())
Expand Down Expand Up @@ -131,7 +131,7 @@ async def _run_job(
return
try:
env = _subprocess_env()
with open(run_dir / template_common.RUN_LOG, "a+b") as run_log:
with open(run_dir / template_common.RUN_LOG, 'a+b') as run_log:
njsmith marked this conversation as resolved.
Show resolved Hide resolved
process = trio.Process(
config['command'],
cwd=config['working_dir'],
Expand Down Expand Up @@ -175,9 +175,9 @@ async def _job_status(job_tracker, request):
pkdc('job_status: {}', request)
job_info = job_tracker.jobs.get(request['jid'])
if job_info is None:
return {"status": _JobStatus.NOT_STARTED.value}
return {'status': _JobStatus.NOT_STARTED.value}
else:
njsmith marked this conversation as resolved.
Show resolved Hide resolved
return {"status": job_info.status.value}
return {'status': job_info.status.value}


async def _cancel_job(job_tracker, request):
Expand All @@ -195,9 +195,9 @@ async def _cancel_job(job_tracker, request):


_HANDLERS = {
njsmith marked this conversation as resolved.
Show resolved Hide resolved
"start_job": _start_job,
"job_status": _job_status,
"cancel_job", _cancel_job,
'start_job': _start_job,
'job_status': _job_status,
'cancel_job', _cancel_job,
}


Expand All @@ -211,7 +211,7 @@ async def _handle_conn(job_tracker, stream):
break
request_bytes += chunk
request = pkjson.load_any(request_bytes)
handler = _HANDLERS[request["action"]]
handler = _HANDLERS[request['action']]
response = await handler(job_tracker, request)
response_bytes = pkjson.dump_bytes(response)
except Exception as exc:
Expand Down Expand Up @@ -244,3 +244,8 @@ async def _main():
def start():
"""Starts the container runner."""
trio.run(_main)


def start_dev():
"""Temporary hack for testing: starts the dev server + container runner."""
import subprocess