diff --git a/setup.py b/setup.py index d00fd2cedf..75d4bb02bd 100644 --- a/setup.py +++ b/setup.py @@ -33,7 +33,8 @@ # requires oauthlib<3.0.0. 'requests-oauthlib==1.1.0', 'uwsgi', - ], + 'trio >= 0.11.0; python_version >= "3"', + ], license='http://www.apache.org/licenses/LICENSE-2.0.html', name='sirepo', url='http://sirepo.com', diff --git a/sirepo/feature_config.py b/sirepo/feature_config.py index 1fcd416584..1ad6a45391 100644 --- a/sirepo/feature_config.py +++ b/sirepo/feature_config.py @@ -56,6 +56,7 @@ def _codes(want_all=None): cfg = pkconfig.init( api_modules=((), tuple, 'optional api modules, e.g. bluesky'), + runner_daemon=(False, bool, 'use the runner daemon'), #TODO(robnagler) make sim_type config rs4pi_dose_calc=(False, bool, 'run the real dose calculator'), sim_types=(None, _cfg_sim_types, 'simulation types (codes) to be imported'), diff --git a/sirepo/pkcli/runner.py b/sirepo/pkcli/runner.py new file mode 100644 index 0000000000..633f67e805 --- /dev/null +++ b/sirepo/pkcli/runner.py @@ -0,0 +1,374 @@ +# -*- coding: utf-8 -*- +"""The runner daemon. + +:copyright: Copyright (c) 2019 RadiaSoft LLC. All Rights Reserved. +:license: http://www.apache.org/licenses/LICENSE-2.0.html +""" + +from __future__ import absolute_import, division, print_function + +import aenum +import async_generator +import collections +import contextlib +import curses +import functools +import os +from pykern.pkdebug import pkdp, pkdc, pkdlog, pkdexc +from pykern import pkio +from pykern import pkjson +import re +import shlex +from sirepo import mpi +from sirepo import runner_client +from sirepo import srdb +from sirepo.template import template_common +import subprocess +import sys +import trio + +# How often threaded blocking operations should wake up and check for Trio +# cancellation +_CANCEL_POLL_INTERVAL = 1 + +_CHUNK_SIZE = 4096 +_LISTEN_QUEUE = 1000 + +_KILL_TIMEOUT_SECS = 3 +#: Need to remove $OMPI and $PMIX to prevent PMIX ERROR: +# See https://github.com/radiasoft/sirepo/issues/1323 +# We also remove SIREPO_ and PYKERN vars, because we shouldn't +# need to pass any of that on, just like runner.docker, doesn't +_EXEC_ENV_REMOVE = re.compile('^(OMPI_|PMIX_|SIREPO_|PYKERN_)') + + +@contextlib.contextmanager +def _catch_and_log_errors(exc_type, msg, *args, **kwargs): + try: + yield + except trio.MultiError as multi_exc: + raise AssertionError('handle MultiErrors in _catch_and_log_errors') + except exc_type: + pkdlog(msg, *args, **kwargs) + pkdlog(pkdexc()) + + +# Used to make sure that if we get simultaneous RPCs for the same jid/run_dir, +# then only one RPC handler runs at a time. Like defaultdict(trio.Lock), but +# without the memory leak. (Maybe should be upstreamed to Trio?) +class _LockDict: + def __init__(self): + # {key: ParkingLot} + # lock is held iff the key exists + self._waiters = {} + + @async_generator.asynccontextmanager + async def __getitem__(self, key): + # acquire + if key not in self._waiters: + # lock is unheld; adding a key makes it held + self._waiters[key] = trio.hazmat.ParkingLot() + else: + # lock is held; wait for someone to pass it to us + await self._waiters[keys].park() + try: + yield + finally: + # release + if self._waiters[key]: + # someone is waiting, so pass them the lock + self._waiters[key].unpark() + else: + # no-one is waiting, so mark the lock unheld + del self._waiters[key] + + +# Helper to call container.wait in a async/cancellation-friendly way +async def _container_wait(container): + while True: + try: + return await trio.run_sync_in_worker_thread( + functools.partial(container.wait, timeout=_CANCEL_POLL_INTERVAL) + ) + # ReadTimeout is what the documentation says this raises. + # ConnectionError is what it actually raises. + # We'll catch both just to be safe. + except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectionError): + pass + + +# Cut down version of simulation_db.write_result +def _write_status(status, run_dir): + fn = run_dir.join('result.json') + if not fn.exists(): + pkjson.dump_pretty({'state': status.value}, filename=fn) + pkio.write_text(run_dir.join('status'), status.value) + + +def _subprocess_env(): + env = dict(os.environ) + for k in list(env): + if _EXEC_ENV_REMOVE.search(k): + del env[k] + env['SIREPO_MPI_CORES'] = str(mpi.cfg.cores) + return env + + +class _JobInfo: + def __init__(self, run_dir, jhash, status, process): + self.run_dir = run_dir + self.jhash = jhash + self.status = status + self.finished = trio.Event() + self.process = process + + +class _JobTracker: + def __init__(self, nursery): + # XX TODO: eventually we'll need a way to stop this growing without + # bound, perhaps by clarifying the split in responsibilities between + # the on-disk simulation_db versus the in-memory status. + self.jobs = {} + self._nursery = nursery + + def status(self, run_dir, jhash): + disk_in_path = run_dir.join('in.json') + disk_status_path = run_dir.join('status') + if disk_in_path.exists() and disk_status_path.exists(): + disk_in_text = pkio.read_text(disk_in_path) + disk_jhash = pkjson.load_any(disk_in_text).reportParametersHash + if disk_jhash == jhash: + disk_status = pkio.read_text(disk_status_path) + if disk_status == 'pending': + # We never write this, so it must be stale, in which case + # the job is no longer pending... + pkdlog( + 'found "pending" status, treating as "error" ({})', + disk_status_path, + ) + return runner_client.JobStatus.ERROR + return runner_client.JobStatus(disk_status) + if run_dir in self.jobs and self.jobs[run_dir].jhash == jhash: + return self.jobs[run_dir].status + return runner_client.JobStatus.MISSING + + async def start_job(self, run_dir, jhash, cmd): + await self._nursery.start(self._run_job, run_dir, jhash, cmd) + + async def _run_job( + self, run_dir, jhash, cmd, *, task_status=trio.TASK_STATUS_IGNORED + ): + # XX TODO: there are still some awkward race conditions here if a new + # job tries to start using the directory while another job is still + # using it. probably start_job should detect this, and either kill the + # existing job (if it has a different jhash + older serial), do + # nothing and report success (if the existing job has the same jhash), + # or error out (if the existing job has a different jhash + newer + # serial). + with _catch_and_log_errors(Exception, 'error in run_job'): + if run_dir in self.jobs: + # Right now, I don't know what happens if we reach here while + # the previous job is still running. The old job might be + # writing to the new job's freshly-initialized run_dir? This + # will be fixed once we move away from having server.py write + # directly into the run_dir. + pkdlog( + 'start_job {}: job is already running. old jhash {}, new jhash {}', + jhash, self.jobs[run_dir].jhash + ) + assert self.jobs[run_dir].jhash == jhash + return + try: + env = _subprocess_env() + run_log_path = run_dir.join(template_common.RUN_LOG) + # we're in py3 mode, and regular subprocesses will inherit our + # environment, so we have to manually switch back to py2 mode. + env['PYENV_VERSION'] = 'py2' + cmd = ['pyenv', 'exec'] + cmd + with open(run_log_path, 'a+b') as run_log: + process = trio.Process( + cmd, + cwd=run_dir, + start_new_session=True, + stdin=subprocess.DEVNULL, + stdout=run_log, + stderr=run_log, + env=env, + ) + self.jobs[run_dir] = _JobInfo( + run_dir, jhash, runner_client.JobStatus.RUNNING, process + ) + async with process: + task_status.started() + # XX more race conditions here, in case we're writing to + # the wrong version of the directory... + await process.wait() + if process.returncode: + pkdlog( + '{} {}: job failed, returncode = {}', + run_dir, jhash, process.returncode, + ) + _write_status(runner_client.JobStatus.ERROR, run_dir) + else: + _write_status(runner_client.JobStatus.COMPLETED, run_dir) + finally: + # _write_status is a no-op if there's already a status, so + # this really means "if we get here without having written a + # status, assume there was some error" + _write_status(runner_client.JobStatus.ERROR, run_dir) + # Make sure that we clear out the running job info and tell + # everyone the job is done, no matter what happened + job_info = self.jobs.pop(run_dir, None) + if job_info is not None: + job_info.finished.set() + + +_RPC_HANDLERS = {} + + +def _rpc_handler(fn): + _RPC_HANDLERS[fn.__name__.lstrip('_')] = fn + return fn + + +@_rpc_handler +async def _start_job(job_tracker, request): + pkdc('start_job: {}', request) + await job_tracker.start_job( + request.run_dir, request.jhash, request.cmd + ) + return {} + + +@_rpc_handler +async def _job_status(job_tracker, request): + pkdc('job_status: {}', request) + return { + 'status': job_tracker.status(request.run_dir, request.jhash).value + } + + +@_rpc_handler +async def _cancel_job(job_tracker, request): + if request.run_dir not in job_tracker.jobs: + return {} + job_info = job_tracker.jobs[request.run_dir] + if job_info.status is not runner_client.JobStatus.RUNNING: + return {} + job_info.status = runner_client.JobStatus.CANCELED + _write_status(runner_client.JobStatus.CANCELED, request.run_dir) + job_info.process.terminate() + with trio.move_on_after(_KILL_TIMEOUT_SECS): + await job_info.finished.wait() + if job_info.returncode is None: + job_info.process.kill() + await job_info.finished.wait() + return {} + + +# XX should we just always acquire a per-job lock here, to make sure we never +# have to worry about different requests for the same job racing? +async def _handle_conn(job_tracker, lock_dict, stream): + with _catch_and_log_errors(Exception, 'error handling request'): + request_bytes = bytearray() + while True: + chunk = await stream.receive_some(_CHUNK_SIZE) + if not chunk: + break + request_bytes += chunk + request = pkjson.load_any(request_bytes) + if 'run_dir' in request: + request.run_dir = pkio.py_path(request.run_dir) + pkdlog('runner request: {!r}', request) + handler = _RPC_HANDLERS[request.action] + async with lock_dict[request.run_dir]: + response = await handler(job_tracker, request) + pkdlog('runner response: {!r}', response) + response_bytes = pkjson.dump_bytes(response) + await stream.send_all(response_bytes) + + +async def _main(): + pkdlog('runner daemon starting up') + with trio.socket.socket(family=trio.socket.AF_UNIX) as sock: + # XX TODO: better strategy for handoff between runner instances + # Clear out any stale socket file + sock_path = srdb.runner_socket_path() + pkio.unchecked_remove(sock_path) + await sock.bind(str(sock_path)) + sock.listen(_LISTEN_QUEUE) + listener = trio.SocketListener(sock) + + async with trio.open_nursery() as nursery: + job_tracker = _JobTracker(nursery) + lock_dict = _LockDict() + await trio.serve_listeners( + functools.partial(_handle_conn, job_tracker, lock_dict), + [listener] + ) + + +def start(): + """Starts the runner daemon.""" + trio.run(_main) + + +# Temporary (?) hack to make testing easier: starts up the http dev server +# under py2 and the runner daemon under py3, and if either exits then kills +# the other. +_RUNNER_DAEMON_OUTPUT_COLOR = 2 # green +_FLASK_DEV_OUTPUT_COLOR = 4 # blue + +def _color(num): + colors = curses.tigetstr('setaf') + if colors is None: + return b'' + return curses.tparm(colors, num) + + +async def _run_cmd(color, cmd, **kwargs): + async def forward_to_stdout_with_color(stream): + while True: + data = await stream.receive_some(_CHUNK_SIZE) + if not data: + return + sys.stdout.buffer.raw.write(_color(color) + data + _color(0)) + + kwargs['stdin'] = subprocess.DEVNULL + kwargs['stdout'] = subprocess.PIPE + kwargs['stderr'] = subprocess.STDOUT + async with trio.open_nursery() as nursery: + async with trio.Process(cmd, **kwargs) as process: + nursery.start_soon(forward_to_stdout_with_color, process.stdout) + await process.wait() + + +async def _dev_main(): + curses.setupterm() + + # To be inherited by children + os.environ['SIREPO_FEATURE_CONFIG_RUNNER_DAEMON'] = '1' + os.environ['PYTHONUNBUFFERED'] = '1' + + async with trio.open_nursery() as nursery: + async def _run_cmd_in_env_then_quit(py_env_name, color, cmd): + env = {**os.environ, 'PYENV_VERSION': py_env_name} + await _run_cmd(color, ['pyenv', 'exec'] + cmd, env=env) + nursery.cancel_scope.cancel() + + nursery.start_soon( + _run_cmd_in_env_then_quit, + 'py2', _FLASK_DEV_OUTPUT_COLOR, ['sirepo', 'service', 'http'], + ) + # We could just run _main here, but spawning a subprocess makes sure + # that everyone has the same config, e.g. for + # SIREPO_FEATURE_FLAG_RUNNER_DAEMON + nursery.start_soon( + _run_cmd_in_env_then_quit, + 'py3', _RUNNER_DAEMON_OUTPUT_COLOR, ['sirepo', 'runner', 'start'], + ) + + +def dev(): + """Starts the runner daemon + the HTTP dev server.""" + trio.run(_dev_main) diff --git a/sirepo/pkcli/service.py b/sirepo/pkcli/service.py index 565bb53718..1da3f461ed 100644 --- a/sirepo/pkcli/service.py +++ b/sirepo/pkcli/service.py @@ -74,11 +74,6 @@ def http(): ) -def container_runner(): - from sirepo.runner.container import serve - serve() - - def nginx_proxy(): """Starts nginx in container. diff --git a/sirepo/runner_client.py b/sirepo/runner_client.py new file mode 100644 index 0000000000..d2830d5ed4 --- /dev/null +++ b/sirepo/runner_client.py @@ -0,0 +1,67 @@ +import aenum +import contextlib +from pykern import pkjson +from pykern.pkdebug import pkdp, pkdc, pkdlog, pkdexc +from sirepo import srdb +import socket + + +_CHUNK_SIZE = 4096 + + +class JobStatus(aenum.Enum): + MISSING = 'missing' # no data on disk, not currently running + RUNNING = 'running' # data on disk is incomplete but it's running + ERROR = 'error' # data on disk exists, but job failed somehow + CANCELED = 'canceled' # data on disk exists, but is incomplete + COMPLETED = 'completed' # data on disk exists, and is fully usable + + +def _rpc(request): + """Send an RPC message to the runner daemon, and get the response. + + Args: + request: the request, as a json-encodeable object + + Returns: + response: the server response + """ + request_bytes = pkjson.dump_bytes(request) + with contextlib.closing(socket.socket(socket.AF_UNIX)) as sock: + sock.connect(str(srdb.runner_socket_path())) + # send the request + sock.sendall(request_bytes) + # send EOF, so the other side knows we've sent the whole thing + sock.shutdown(socket.SHUT_WR) + # read the response + response_bytes = bytearray() + while True: + chunk = sock.recv(_CHUNK_SIZE) + if not chunk: + break + response_bytes += chunk + if response_bytes == b'': + raise AssertionError('runner daemon had an unknown error') + return pkjson.load_any(bytes(response_bytes)) + + +def start_job(run_dir, jhash, cmd): + return _rpc({ + 'action': 'start_job', + 'run_dir': str(run_dir), + 'jhash': jhash, + 'cmd': cmd, + }) + + +def job_status(run_dir, jhash): + result = _rpc({ + 'action': 'job_status', 'run_dir': str(run_dir), 'jhash': jhash, + }) + return JobStatus(result.status) + + +def cancel_job(run_dir, jhash): + return _rpc({ + 'action': 'cancel_job', 'run_dir': str(run_dir), 'jhash': jhash, + }) diff --git a/sirepo/server.py b/sirepo/server.py index 372bb3ebf0..9daae75ef4 100644 --- a/sirepo/server.py +++ b/sirepo/server.py @@ -16,6 +16,7 @@ from sirepo import http_reply from sirepo import http_request from sirepo import runner +from sirepo import runner_client from sirepo import simulation_db from sirepo import srdb from sirepo import uri_router @@ -430,48 +431,84 @@ def api_root(simulation_type): def api_runCancel(): data = _parse_data_input() jid = simulation_db.job_id(data) - # TODO(robnagler) need to have a way of listing jobs - # Don't bother with cache_hit check. We don't have any way of canceling - # if the parameters don't match so for now, always kill. - #TODO(robnagler) mutex required - if runner.job_is_processing(jid): + if feature_config.cfg.runner_daemon: + jhash = template_common.report_parameters_hash(data) run_dir = simulation_db.simulation_run_dir(data) - # Write first, since results are write once, and we want to - # indicate the cancel instead of the termination error that - # will happen as a result of the kill. - simulation_db.write_result({'state': 'canceled'}, run_dir=run_dir) - runner.job_kill(jid) - # TODO(robnagler) should really be inside the template (t.cancel_simulation()?) - # the last frame file may not be finished, remove it - t = sirepo.template.import_module(data) - if hasattr(t, 'remove_last_frame'): - t.remove_last_frame(run_dir) - # Always true from the client's perspective - return http_reply.gen_json({'state': 'canceled'}) + runner_client.cancel_job(run_dir, jhash) + # Always true from the client's perspective + return http_reply.gen_json({'state': 'canceled'}) + else: + # TODO(robnagler) need to have a way of listing jobs + # Don't bother with cache_hit check. We don't have any way of canceling + # if the parameters don't match so for now, always kill. + #TODO(robnagler) mutex required + if runner.job_is_processing(jid): + run_dir = simulation_db.simulation_run_dir(data) + # Write first, since results are write once, and we want to + # indicate the cancel instead of the termination error that + # will happen as a result of the kill. + simulation_db.write_result({'state': 'canceled'}, run_dir=run_dir) + runner.job_kill(jid) + # TODO(robnagler) should really be inside the template (t.cancel_simulation()?) + # the last frame file may not be finished, remove it + t = sirepo.template.import_module(data) + if hasattr(t, 'remove_last_frame'): + t.remove_last_frame(run_dir) + # Always true from the client's perspective + return http_reply.gen_json({'state': 'canceled'}) @api_perm.require_user def api_runSimulation(): + from pykern import pkjson data = _parse_data_input(validate=True) - res = _simulation_run_status(data, quiet=True) - if ( - ( - not res['state'] in _RUN_STATES - and (res['state'] != 'completed' or data.get('forceRun', False)) - ) or res.get('parametersChanged', True) - ): - try: - _start_simulation(data) - except runner.Collision: - pkdlog('{}: runner.Collision, ignoring start', simulation_db.job_id(data)) - res = _simulation_run_status(data) - return http_reply.gen_json(res) + # if flag is set + # - check status + # - if status is bad, rewrite the run dir (XX race condition, to fix later) + # - then request it be started + if feature_config.cfg.runner_daemon: + jhash = template_common.report_parameters_hash(data) + run_dir = simulation_db.simulation_run_dir(data) + status = runner_client.job_status(run_dir, jhash) + already_good_status = [runner_client.JobStatus.RUNNING, + runner_client.JobStatus.COMPLETED] + if status not in already_good_status: + data['simulationStatus'] = { + 'startTime': int(time.time()), + 'state': 'pending', + } + # XX TODO: prepare in a temp directory + cmd, _ = simulation_db.prepare_simulation(data) + # XX TODO: prepare_simulation shouldn't create this file in the + # first place -- managing this file is runner.py's job. + pkio.unchecked_remove(run_dir.join('status')) + runner_client.start_job(run_dir, jhash, cmd) + res = _simulation_run_status_runner_daemon(data, quiet=True) + return http_reply.gen_json(res) + else: + res = _simulation_run_status(data, quiet=True) + if ( + ( + not res['state'] in _RUN_STATES + and (res['state'] != 'completed' or data.get('forceRun', False)) + ) or res.get('parametersChanged', True) + ): + try: + _start_simulation(data) + except runner.Collision: + pkdlog('{}: runner.Collision, ignoring start', simulation_db.job_id(data)) + res = _simulation_run_status(data) + return http_reply.gen_json(res) @api_perm.require_user def api_runStatus(): data = _parse_data_input() - return http_reply.gen_json(_simulation_run_status(data)) + if feature_config.cfg.runner_daemon: + status = _simulation_run_status_runner_daemon(data) + else: + status = _simulation_run_status(data) + return http_reply.gen_json(status) @api_perm.allow_visitor @@ -805,6 +842,84 @@ def _simulation_name(res, path, data): res.append(data['models']['simulation']['name']) +def _simulation_run_status_runner_daemon(data, quiet=False): + """Look for simulation status and output + + Args: + data (dict): request + quiet (bool): don't write errors to log + + Returns: + dict: status response + """ + try: + run_dir = simulation_db.simulation_run_dir(data) + jhash = template_common.report_parameters_hash(data) + status = runner_client.job_status(run_dir, jhash) + is_running = status is runner_client.JobStatus.RUNNING + rep = simulation_db.report_info(data) + res = {'state': status.value} + template = sirepo.template.import_module(data) + if not is_running: + if run_dir.exists(): + if hasattr(template, 'prepare_output_file') and 'models' in data: + template.prepare_output_file(rep, data) + res2, err = simulation_db.read_result(run_dir) + if err: + if simulation_db.is_parallel(data): + # allow parallel jobs to use template to parse errors below + res['state'] = 'error' + else: + if hasattr(template, 'parse_error_log'): + res = template.parse_error_log(rep.run_dir) + if res: + return res + return _simulation_error(err, 'error in read_result', rep.run_dir) + else: + res = res2 + if simulation_db.is_parallel(data): + new = template.background_percent_complete( + rep.model_name, + rep.run_dir, + is_running, + ) + new.setdefault('percentComplete', 0.0) + new.setdefault('frameCount', 0) + res.update(new) + res['parametersChanged'] = rep.parameters_changed + if res['parametersChanged']: + pkdlog( + '{}: parametersChanged=True req_hash={} cached_hash={}', + rep.job_id, + rep.req_hash, + rep.cached_hash, + ) + #TODO(robnagler) verify serial number to see what's newer + res.setdefault('startTime', _mtime_or_now(rep.input_file)) + res.setdefault('lastUpdateTime', _mtime_or_now(rep.run_dir)) + res.setdefault('elapsedTime', res['lastUpdateTime'] - res['startTime']) + if is_running: + res['nextRequestSeconds'] = simulation_db.poll_seconds(rep.cached_data) + res['nextRequest'] = { + 'report': rep.model_name, + 'reportParametersHash': rep.cached_hash, + 'simulationId': rep.cached_data['simulationId'], + 'simulationType': rep.cached_data['simulationType'], + } + pkdc( + '{}: processing={} state={} cache_hit={} cached_hash={} data_hash={}', + rep.job_id, + is_running, + res['state'], + rep.cache_hit, + rep.cached_hash, + rep.req_hash, + ) + except Exception: + return _simulation_error(pkdexc(), quiet=quiet) + return res + + def _simulation_run_status(data, quiet=False): """Look for simulation status and output diff --git a/sirepo/srdb.py b/sirepo/srdb.py index bc5ef34d0a..f14c5d4f9f 100644 --- a/sirepo/srdb.py +++ b/sirepo/srdb.py @@ -26,6 +26,10 @@ def root(): return _root +def runner_socket_path(): + return root() / 'runner.sock' + + def server_init_root(value): _init_root(value) return root() diff --git a/sirepo/srunit.py b/sirepo/srunit.py index 843a990ec4..4c3e24f46e 100644 --- a/sirepo/srunit.py +++ b/sirepo/srunit.py @@ -40,7 +40,7 @@ def flask_client(cfg=None): if not cfg: cfg = {} wd = pkunit.work_dir() - cfg['SIREPO_SERVER_DB_DIR'] = str(pkio.mkdir_parent(wd.join('db'))) + cfg['SIREPO_SRDB_ROOT'] = str(pkio.mkdir_parent(wd.join('db'))) if not (server and hasattr(server.app, a)): with pkio.save_chdir(wd): pkconfig.reset_state_for_testing(cfg) diff --git a/tests/runner_test.py b/tests/runner_test.py new file mode 100644 index 0000000000..c2303bbf0e --- /dev/null +++ b/tests/runner_test.py @@ -0,0 +1,98 @@ +# -*- coding: utf-8 -*- +u"""End-to-end tests of the runner daemon feature. + +:copyright: Copyright (c) 2019 RadiaSoft LLC. All Rights Reserved. +:license: http://www.apache.org/licenses/LICENSE-2.0.html +""" +from __future__ import absolute_import, division, print_function + +import pytest +import time +import os +import subprocess +import sys + + +# Simple test that we can (1) run something (runSimulation), (2) get results +# (runStatus). +def test_runner_myapp(): + os.environ['SIREPO_FEATURE_CONFIG_RUNNER_DAEMON'] = '1' + os.environ['PYTHONUNBUFFERED'] = '1' + + # Check if the py3 environment is set up + py3_env = dict(os.environ) + py3_env['PYENV_VERSION'] = 'py3' + returncode = subprocess.call( + ['pyenv', 'exec', 'sirepo', '--help'], env=py3_env + ) + # if 'sirepo' isn't found, returncode == 127 + if returncode != 1: + pytest.skip('py3 environment not configured') + + from sirepo import srunit + from pykern import pkunit + from pykern import pkio + from pykern.pkdebug import pkdlog + + fc = srunit.flask_client() + + from sirepo import srdb + pkdlog(srdb.runner_socket_path()) + + pkio.unchecked_remove(srdb.runner_socket_path()) + + runner_env = dict(py3_env) + runner_env['SIREPO_SRDB_ROOT'] = str(srdb.root()) + runner = subprocess.Popen( + ['pyenv', 'exec', 'sirepo', 'runner', 'start'], env=runner_env + ) + try: + # Wait for the server to have started up + while not srdb.runner_socket_path().exists(): + time.sleep(0.1) + + fc.get('/myapp') + data = fc.sr_post( + 'listSimulations', + {'simulationType': 'myapp', + 'search': {'simulationName': 'heightWeightReport'}}, + ) + pkdlog(data) + data = data[0].simulation + pkdlog(data) + data = fc.sr_get( + 'simulationData', + params=dict( + pretty='1', + simulation_id=data.simulationId, + simulation_type='myapp', + ), + ) + pkdlog(data) + run = fc.sr_post( + 'runSimulation', + dict( + forceRun=False, + models=data.models, + report='heightWeightReport', + simulationId=data.models.simulation.simulationId, + simulationType=data.simulationType, + ), + ) + pkdlog(run) + for _ in range(10): + run = fc.sr_post( + 'runStatus', + run.nextRequest + ) + pkdlog(run) + if run.state == 'completed': + break + time.sleep(1) + else: + pkunit.pkfail('runStatus: failed to complete: {}', run) + # Just double-check it actually worked + assert u'plots' in run + finally: + runner.terminate() + runner.wait()