-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SOW 2 #1504
SOW 2 #1504
Conversation
sirepo/pkcli/runner.py
Outdated
simulation_db.write_result({'state': 'canceled'}, run_dir=job_info.run_dir) | ||
# XX TODO: this is what api_runCancel used to do, but we can't really do | ||
# it here. What should we do? (For a cancelled job, should we just delete | ||
# the run-dir entirely, once we have the full hash in the jid?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
canceled jobs can contain usable data. When we run parallel simulations, the user can cancel at any time, and still have the results already computed. Perhaps this might be a different state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. I didn't see any issues, just some style/encapsulation comments.
I think we should drop docker next so we can get the workflow right in a real environment. We can assume rely on python2 -m sirepo.sirepo_console srw run run_dir
for now.
Status: the structure's there, it starts up (relatively conveniently with the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Keep at it. If you need help with simulation_db or server.py let me know.
OK, I guess you'll want some tweaks but I think this has a working implementation of everything in SOW 2 now! How to run itWe're running flask under py2, and the runner daemon under py3. We already have pyenv set up to create both a py2 and a py3 environment; now we need sirepo installed in both. So something like: $ pyenv global py2:py3
$ pip3 install -e ~/src/radiasoft/pykern
$ mkdir -p ~/src/njsmith
$ cd ~/src/njsmith
$ gcl sirepo
$ cd sirepo
$ git checkout njs-sow-2
$ pip3 install -e .
$ pip2 install -e .
# Make sure it worked:
$ PYENV_VERSION=py3 pyenv exec sirepo --help
$ [[ $(cat $(pyenv prefix py2)/lib/python*/site-packages/sirepo*) =~ $PWD ]] && echo ok
$ [[ $(cat $(pyenv prefix py3)/lib/python*/site-packages/sirepo*) =~ $PWD ]] && echo ok To spin up both the flask server and runner daemon to try it: $ PYENV_VERSION=py3 SIREPO_FEATURE_CONFIG_RUNNER_DAEMON=1 pyenv exec sirepo runner dev To run the test: $ cd ~/src/njsmith/sirepo
$ pytest tests -k runner_test The test assumes you have the py3 env set up. We might want to change that before merging? Because of #1528, I haven't been able to run the full test suite. Other notesI ended up switching to using |
I updated your comment to include a better environment test and to install the branch in py2 as well. It won't work in ~/src/radiasoft so better to pull your branch and test there for now. Need to pass the feature_config. Here's patch to fix a bug when running the server: diff --git a/sirepo/simulation_db.py b/sirepo/simulation_db.py
index f7e6b3a0..965bdec2 100644
--- a/sirepo/simulation_db.py
+++ b/sirepo/simulation_db.py
@@ -918,7 +918,7 @@ def write_result(result, run_dir=None):
# Don't overwrite first written file, because first write is
# closest to the reason is stopped (e.g. canceled)
return
- result.setdefault('state', 'completed')
+ result.setdefault('state', 'complete')
write_json(fn, result)
write_status(result['state'], run_dir)
input_file = json_filename(template_common.INPUT_BASE_NAME, run_dir) When I run the test, I get: $ pytest tests -k runner_test
================================================================================= test session starts ==================================================================================
platform linux2 -- Python 2.7.14, pytest-3.2.3, py-1.7.0, pluggy-0.4.0
rootdir: /home/vagrant/src/njsmith/sirepo, inifile:
plugins: forked-0.2
collected 37 items
tests/runner_test.py F
======================================================================================= FAILURES =======================================================================================
__________________________________________________________________________________ test_runner_myapp ___________________________________________________________________________________
Traceback (most recent call last):
File "/home/vagrant/src/njsmith/sirepo/tests/runner_test.py", line 77, in test_runner_myapp
run.nextRequest
File "/home/vagrant/src/radiasoft/pykern/pykern/pkcollections.py", line 56, in __getattr__
return self.__getattribute__(name)
AttributeError: 'Dict' object has no attribute 'nextRequest'
--------------------------------------------------------------------------------- Captured stdout call ---------------------------------------------------------------------------------
/home/vagrant/src/njsmith/sirepo/tests/runner_work/db/runner.sock
[{u'name': u'Scooby Doo', u'isExample': True, u'simulation': {u'simulationSerial': 1550518848490481, u'isExample': True, u'name': u'Scooby Doo', u'documentationUrl': u'', u'notes': u'', u'outOfSessionSimulationId': u'', u'folder': u'/', u'simulationId': u'a11pySID'}, u'last_modified': u'2019-02-18 19:40', u'folder': u'/', u'simulationId': u'a11pySID'}]
{u'simulationSerial': 1550518848490481, u'isExample': True, u'name': u'Scooby Doo', u'documentationUrl': u'', u'notes': u'', u'outOfSessionSimulationId': u'', u'folder': u'/', u'simulationId': u'a11pySID'}
{u'models': {u'simFolder': {}, u'heightWeightReport': {}, u'dog': {u'weight': 70.25, u'gender': u'male', u'breed': u'Great Dane', u'height': 81.28, u'disposition': u'friendly', u'favoriteTreat': u''}, u'simulation': {u'simulationSerial': 1550518848490481, u'outOfSessionSimulationId': u'', u'name': u'Scooby Doo', u'documentationUrl': u'', u'notes': u'', u'isExample': True, u'folder': u'/', u'simulationId': u'a11pySID'}}, u'simulationType': u'myapp', u'version': u'20190218.183041'}
{u'lastUpdateTime': 1550518848, u'nextRequest': {u'report': u'heightWeightReport', u'simulationId': u'a11pySID', u'simulationType': u'myapp', u'reportParametersHash': u'cc093c5d2ff817018618f1422b06fb8e'}, u'elapsedTime': 0, u'parametersChanged': False, u'state': u'running', u'nextRequestSeconds': 1, u'startTime': 1550518848}
{u'lastUpdateTime': 1550518848, u'nextRequest': {u'report': u'heightWeightReport', u'simulationId': u'a11pySID', u'simulationType': u'myapp', u'reportParametersHash': u'cc093c5d2ff817018618f1422b06fb8e'}, u'elapsedTime': 0, u'parametersChanged': False, u'state': u'running', u'nextRequestSeconds': 1, u'startTime': 1550518848}
{u'lastUpdateTime': 1550518849, u'nextRequest': {u'report': u'heightWeightReport', u'simulationId': u'a11pySID', u'simulationType': u'myapp', u'reportParametersHash': u'cc093c5d2ff817018618f1422b06fb8e'}, u'elapsedTime': 1, u'parametersChanged': False, u'state': u'running', u'nextRequestSeconds': 1, u'startTime': 1550518848}
{u'x_range': [0.0, 11.0], u'lastUpdateTime': 1550518849, u'title': u'Dog Height and Weight Over Time', u'x_label': u'Age (years)', u'y_label': u'', u'x_points': [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0], u'elapsedTime': 1, u'parametersChanged': False, u'state': u'complete', u'startTime': 1550518848, u'plots': [{u'color': u'#1f77b4', u'points': [0.2585063948347131, 40.21214187401185, 62.86256433979385, 70.72255244830933, 73.34528811291915, 75.79496025061201, 76.65710318744058, 77.46562824139762, 77.54195385075052, 76.99962352192519, 77.78121331936578, 78.34547050274512], u'name': u'height', u'label': u'Height [cm]'}, {u'color': u'#ff7f0e', u'points': [1.633799347829997, 34.95554368122375, 53.53419538335956, 61.49624285854372, 63.19572998363415, 64.95538663173728, 65.17655914923125, 65.74938620364235, 66.76049226524795, 66.3744414760862, 66.16140689716958, 67.02105552245497], u'name': u'weight', u'label': u'Weight [lbs]'}], u'y_range': [0.2585063948347131, 78.34547050274512]}
================================================================================= 36 tests deselected ==================================================================================
======================================================================= 1 failed, 36 deselected in 5.96 seconds ======================================================================== |
Nice catch on the This also explains the test failure you saw: I copied the test's status-checking loop from So my guess is that if you pull the latest version of my branch, the test should start passing for you. This still doesn't explain why I'm having trouble running the other tests, but I guess it must be something unrelated... |
Leaving it as completed is fine. Could also fix the code.
Fail fast. Let the computer do the thinking. :) |
I realized that that we should be testing JobStatus. That was the lack of fail fast. When read_status runs, it should validate against JobStatus. |
The new code does validate against |
I updated this so that the test automatically skips itself if you don't have the new py3 environment stuff set up. I guess eventually we'll want to make the py3 environment a standard thing, but I don't want to break your test suite in the mean time if this is merged :-). |
pkdp and print are not allowed. This patch converts to pkdlog. diff --git a/sirepo/pkcli/runner.py b/sirepo/pkcli/runner.py
index 82654a3d..c9a99adb 100644
--- a/sirepo/pkcli/runner.py
+++ b/sirepo/pkcli/runner.py
@@ -264,11 +264,11 @@ async def _handle_conn(job_tracker, lock_dict, stream):
request = pkjson.load_any(request_bytes)
if 'run_dir' in request:
request.run_dir = pkio.py_path(request.run_dir)
- pkdp('runner request: {!r}', request)
+ pkdlog('runner request: {!r}', request)
handler = _RPC_HANDLERS[request.action]
async with lock_dict[request.run_dir]:
response = await handler(job_tracker, request)
- pkdp('runner response: {!r}', response)
+ pkdlog('runner response: {!r}', response)
response_bytes = pkjson.dump_bytes(response)
except Exception as exc:
await stream.send_all(
diff --git a/tests/runner_test.py b/tests/runner_test.py
index c5a39d94..7ee0de61 100644
--- a/tests/runner_test.py
+++ b/tests/runner_test.py
@@ -35,8 +35,9 @@ def test_runner_myapp():
fc = srunit.flask_client()
+ from pykern.pkdebug import pkdlog
from sirepo import srdb
- print(srdb.runner_socket_path())
+ pkdlog(srdb.runner_socket_path())
pkio.unchecked_remove(srdb.runner_socket_path())
@@ -56,9 +57,9 @@ def test_runner_myapp():
{'simulationType': 'myapp',
'search': {'simulationName': 'heightWeightReport'}},
)
- print(data)
+ pkdlog(data)
data = data[0].simulation
- print(data)
+ pkdlog(data)
data = fc.sr_get(
'simulationData',
params=dict(
@@ -67,7 +68,7 @@ def test_runner_myapp():
simulation_type='myapp',
),
)
- print(data)
+ pkdlog(data)
run = fc.sr_post(
'runSimulation',
dict(
@@ -78,13 +79,13 @@ def test_runner_myapp():
simulationType=data.simulationType,
),
)
- print(run)
+ pkdlog(run)
for _ in range(10):
run = fc.sr_post(
'runStatus',
run.nextRequest
)
- print(run)
+ pkdlog(run)
if run.state == 'completed':
break
time.sleep(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments. I'll probably have more. One general comment: I think there are assumptions about the state of the global data structures on return from an await that can't be made. I would assume that anything can happen once you release the thread of control. Therefore, you need to validate jhash to be sure that the job hasn't been switched out. I could be wrong. I haven't followed the LockDict code so perhaps that guarantees something about the globals.
sirepo/pkcli/runner.py
Outdated
pkdp('runner response: {!r}', response) | ||
response_bytes = pkjson.dump_bytes(response) | ||
except Exception as exc: | ||
await stream.send_all( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is an error on the stream itself, you can't send to it. It's safer to protect against that rather than raise another exception.
I think it shouldn't send back the error string, and instead, just send back "error". It breaks the abstraction to respond with raw errors. Rather, just log the error here. If there's a known error, translate it to a known error code for the sender.
Logging might look like: pkdlog('Request={} error={} stack=', request, exc, pkdexc())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is an error on the stream itself, you can't send to it. It's safer to protect against that rather than raise another exception.
In Python 3 we have implicit exception chaining, so if stream.send_all
raises an exception, then both the original exception+traceback and the new exception+traceback will both be logged.
Also, we're already logging any leaking exceptions with _catch_and_log_errors
, so that's not an issue.
Easy enough to just send back "error" though, I'll do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we might as well delete this bit and make the error indicator be "empty response" – adding fancier error handling code just produces more chances for things to go wrong :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Python 3 we have implicit exception chaining, so if
stream.send_all
raises an exception, then both the original exception+traceback and the new exception+traceback will both be logged.
...actually, I'm wrong, because pkdexc
is buggy on py3 :-( radiasoft/pykern#29
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is an error on the stream itself, you can't send to it. It's safer to protect against that rather than raise another exception.
Easy enough to just send back "error" though, I'll do that.
To be clearer: stream
is in error, you shouldn't send on it, because you'll get a cascade that could be avoided. I feel like in error code, we have to take pains to avoid that type of cascade, because it makes debugging harder, even if py3 gives you all the tracebacks.
# everyone the job is done, no matter what happened | ||
job_info = self.jobs.pop(run_dir, None) | ||
if job_info is not None: | ||
job_info.finished.set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to validate the jhash, because could be a different job
Thanks the review! I'll take a look later today.
…On Sun, Feb 24, 2019, 05:55 Rob Nagler ***@***.***> wrote:
***@***.**** commented on this pull request.
Some comments. I'll probably have more. One general comment: I think there
are assumptions about the state of the global data structures on return
from an await that can't be made. I would assume that anything can happen
once you release the thread of control. Therefore, you need to validate
jhash to be sure that the job hasn't been switched out. I could be wrong. I
haven't followed the LockDict code so perhaps that guarantees something
about the globals.
------------------------------
In sirepo/pkcli/runner.py
<#1504 (comment)>:
> + while True:
+ chunk = await stream.receive_some(_CHUNK_SIZE)
+ if not chunk:
+ break
+ request_bytes += chunk
+ request = pkjson.load_any(request_bytes)
+ if 'run_dir' in request:
+ request.run_dir = pkio.py_path(request.run_dir)
+ pkdp('runner request: {!r}', request)
+ handler = _RPC_HANDLERS[request.action]
+ async with lock_dict[request.run_dir]:
+ response = await handler(job_tracker, request)
+ pkdp('runner response: {!r}', response)
+ response_bytes = pkjson.dump_bytes(response)
+ except Exception as exc:
+ await stream.send_all(
If there is an error on the stream itself, you can't send to it. It's
safer to protect against that rather than raise another exception.
I think it shouldn't send back the error string, and instead, just send
back "error". It breaks the abstraction to respond with raw errors. Rather,
just log the error here. If there's a known error, translate it to a known
error code for the sender.
Logging might look like: pkdlog('Request={} error={} stack=', request,
exc, pkdexc())
------------------------------
In sirepo/pkcli/runner.py
<#1504 (comment)>:
> + if 'run_dir' in request:
+ request.run_dir = pkio.py_path(request.run_dir)
+ pkdp('runner request: {!r}', request)
+ handler = _RPC_HANDLERS[request.action]
+ async with lock_dict[request.run_dir]:
+ response = await handler(job_tracker, request)
+ pkdp('runner response: {!r}', response)
+ response_bytes = pkjson.dump_bytes(response)
+ except Exception as exc:
+ await stream.send_all(
+ pkjson.dump_bytes({'error_string': repr(exc)})
+ )
+ # Let's also log the full error here
+ raise
+ else:
+ await stream.send_all(response_bytes)
If there's a BaseException (e.g. SystemExit) it will send a response
anyway. I don't think that makes sense, because we don't know that
response_bytes is valid here. I think the send should be inside the try,
and any BaseExceptions are simply raised implicitly.
------------------------------
In sirepo/pkcli/runner.py
<#1504 (comment)>:
> + self.jhash = jhash
+ self.status = status
+ self.finished = trio.Event()
+ self.process = process
+
+
+class _JobTracker:
+ def __init__(self, nursery):
+ # XX TODO: eventually we'll need a way to stop this growing without
+ # bound, perhaps by clarifying the split in responsibilities between
+ # the on-disk simulation_db versus the in-memory status.
+ self.jobs = {}
+ self._nursery = nursery
+
+ def status(self, run_dir, jhash):
+ disk_in_path = run_dir.join('in.json')
Reading from disk in this case needs to be protected by a try and report
an error or missing status.
------------------------------
In sirepo/pkcli/runner.py
<#1504 (comment)>:
> + # XX TODO: eventually we'll need a way to stop this growing without
+ # bound, perhaps by clarifying the split in responsibilities between
+ # the on-disk simulation_db versus the in-memory status.
+ self.jobs = {}
+ self._nursery = nursery
+
+ def status(self, run_dir, jhash):
+ disk_in_path = run_dir.join('in.json')
+ disk_status_path = run_dir.join('status')
+ if disk_in_path.exists() and disk_status_path.exists():
+ disk_in_text = pkio.read_text(disk_in_path)
+ disk_jhash = pkjson.load_any(disk_in_text).reportParametersHash
+ if disk_jhash == jhash:
+ disk_status = pkio.read_text(disk_status_path)
+ if disk_status == 'pending':
+ # We never write this, so it must be stale, in which case
log this
------------------------------
In sirepo/pkcli/runner.py
<#1504 (comment)>:
> + return runner_client.JobStatus(disk_status)
+ if run_dir in self.jobs and self.jobs[run_dir].jhash == jhash:
+ return self.jobs[run_dir].status
+ return runner_client.JobStatus.MISSING
+
+ async def start_job(self, run_dir, jhash, cmd):
+ await self._nursery.start(self._run_job, run_dir, jhash, cmd)
+
+ async def _run_job(
+ self, run_dir, jhash, cmd, *, task_status=trio.TASK_STATUS_IGNORED
+ ):
+ # XX TODO: there are all kinds of race conditions here, e.g. if the
+ # same jid gets started multiple times in parallel... we need to
+ # revisit this once jids are globally unique, and possibly add
+ # features like "if there is another job running with the same
+ # user/sim/job but a different hash, then auto-cancel that other job"
and a newer serial
<https://github.com/radiasoft/sirepo/blob/3f7a4ba4333e8649775a4eac6e18d724442114bc/sirepo/simulation_db.py#L859>
.
------------------------------
In sirepo/pkcli/runner.py
<#1504 (comment)>:
> + return runner_client.JobStatus.MISSING
+
+ async def start_job(self, run_dir, jhash, cmd):
+ await self._nursery.start(self._run_job, run_dir, jhash, cmd)
+
+ async def _run_job(
+ self, run_dir, jhash, cmd, *, task_status=trio.TASK_STATUS_IGNORED
+ ):
+ # XX TODO: there are all kinds of race conditions here, e.g. if the
+ # same jid gets started multiple times in parallel... we need to
+ # revisit this once jids are globally unique, and possibly add
+ # features like "if there is another job running with the same
+ # user/sim/job but a different hash, then auto-cancel that other job"
+ with _catch_and_log_errors(Exception, 'error in run_job'):
+ if run_dir in self.jobs:
+ # Right now, I don't know what happens if we reach here while
log this
------------------------------
In sirepo/pkcli/runner.py
<#1504 (comment)>:
> + # revisit this once jids are globally unique, and possibly add
+ # features like "if there is another job running with the same
+ # user/sim/job but a different hash, then auto-cancel that other job"
+ with _catch_and_log_errors(Exception, 'error in run_job'):
+ if run_dir in self.jobs:
+ # Right now, I don't know what happens if we reach here while
+ # the previous job is still running. The old job might be
+ # writing to the new job's freshly-initialized run_dir? This
+ # will be fixed once we move away from having server.py write
+ # directly into the run_dir.
+ assert self.jobs[jhash] == jhash
+ return
+ try:
+ env = _subprocess_env()
+ run_log_path = run_dir.join(template_common.RUN_LOG)
+ # hack: switch back to py2 mode to run the actual command
Not a hack. It's going to be what we do for a while. It will eventually
need to be encapsulated, because the docker container will need the same
code.
------------------------------
In sirepo/pkcli/runner.py
<#1504 (comment)>:
> + # same jid gets started multiple times in parallel... we need to
+ # revisit this once jids are globally unique, and possibly add
+ # features like "if there is another job running with the same
+ # user/sim/job but a different hash, then auto-cancel that other job"
+ with _catch_and_log_errors(Exception, 'error in run_job'):
+ if run_dir in self.jobs:
+ # Right now, I don't know what happens if we reach here while
+ # the previous job is still running. The old job might be
+ # writing to the new job's freshly-initialized run_dir? This
+ # will be fixed once we move away from having server.py write
+ # directly into the run_dir.
+ assert self.jobs[jhash] == jhash
+ return
+ try:
+ env = _subprocess_env()
+ run_log_path = run_dir.join(template_common.RUN_LOG)
I've been using hungarian notation run_log_f for file and run_d for
directory. Long local variable names also don't have a lot of value,
especially when they are just cached values like this.
------------------------------
In sirepo/pkcli/runner.py
<#1504 (comment)>:
> + # will be fixed once we move away from having server.py write
+ # directly into the run_dir.
+ assert self.jobs[jhash] == jhash
+ return
+ try:
+ env = _subprocess_env()
+ run_log_path = run_dir.join(template_common.RUN_LOG)
+ # hack: switch back to py2 mode to run the actual command
+ env['PYENV_VERSION'] = 'py2'
+ cmd = ['pyenv', 'exec'] + cmd
+ with open(run_log_path, 'a+b') as run_log:
+ process = trio.Process(
+ cmd,
+ cwd=run_dir,
+ start_new_session=True,
+ stdin=run_log, # XX TODO: should be subprocess.DEVNULL?
definitely don't want to pass the run_log to stdin. It should be devnull.
Nobody should be reading stdin.
------------------------------
In sirepo/pkcli/runner.py
<#1504 (comment)>:
> + cwd=run_dir,
+ start_new_session=True,
+ stdin=run_log, # XX TODO: should be subprocess.DEVNULL?
+ stdout=run_log,
+ stderr=run_log,
+ env=env,
+ )
+ self.jobs[run_dir] = _JobInfo(
+ run_dir, jhash, runner_client.JobStatus.RUNNING, process
+ )
+ async with process:
+ task_status.started()
+ # XX more race conditions here, in case we're writing to
+ # the wrong version of the directory...
+ await process.wait()
+ if process.returncode:
If p is used for process, you can write this with better encapsulation:
s = getattr(runner_client.JobStatus, 'ERROR' if p.returncode else 'COMPLETED')
_write_status(s, run_dir)
------------------------------
In sirepo/pkcli/runner.py
<#1504 (comment)>:
> + cmd,
+ cwd=run_dir,
+ start_new_session=True,
+ stdin=run_log, # XX TODO: should be subprocess.DEVNULL?
+ stdout=run_log,
+ stderr=run_log,
+ env=env,
+ )
+ self.jobs[run_dir] = _JobInfo(
+ run_dir, jhash, runner_client.JobStatus.RUNNING, process
+ )
+ async with process:
+ task_status.started()
+ # XX more race conditions here, in case we're writing to
+ # the wrong version of the directory...
+ await process.wait()
we probably want to log the process status in an error
------------------------------
In sirepo/pkcli/runner.py
<#1504 (comment)>:
> + run_dir, jhash, runner_client.JobStatus.RUNNING, process
+ )
+ async with process:
+ task_status.started()
+ # XX more race conditions here, in case we're writing to
+ # the wrong version of the directory...
+ await process.wait()
+ if process.returncode:
+ _write_status(runner_client.JobStatus.ERROR, run_dir)
+ else:
+ _write_status(runner_client.JobStatus.COMPLETED, run_dir)
+ finally:
+ # _write_status is a no-op if there's already a status, so
+ # this really means "if we get here without having written a
+ # status, assume there was some error"
+ _write_status(runner_client.JobStatus.ERROR, run_dir)
definitely want to log this path
------------------------------
In sirepo/pkcli/runner.py
<#1504 (comment)>:
> + async def _run_job(
+ self, run_dir, jhash, cmd, *, task_status=trio.TASK_STATUS_IGNORED
+ ):
+ # XX TODO: there are all kinds of race conditions here, e.g. if the
+ # same jid gets started multiple times in parallel... we need to
+ # revisit this once jids are globally unique, and possibly add
+ # features like "if there is another job running with the same
+ # user/sim/job but a different hash, then auto-cancel that other job"
+ with _catch_and_log_errors(Exception, 'error in run_job'):
+ if run_dir in self.jobs:
+ # Right now, I don't know what happens if we reach here while
+ # the previous job is still running. The old job might be
+ # writing to the new job's freshly-initialized run_dir? This
+ # will be fixed once we move away from having server.py write
+ # directly into the run_dir.
+ assert self.jobs[jhash] == jhash
in other places self.jobs is indexed by run_dir.
------------------------------
In sirepo/pkcli/runner.py
<#1504 (comment)>:
> + # the wrong version of the directory...
+ await process.wait()
+ if process.returncode:
+ _write_status(runner_client.JobStatus.ERROR, run_dir)
+ else:
+ _write_status(runner_client.JobStatus.COMPLETED, run_dir)
+ finally:
+ # _write_status is a no-op if there's already a status, so
+ # this really means "if we get here without having written a
+ # status, assume there was some error"
+ _write_status(runner_client.JobStatus.ERROR, run_dir)
+ # Make sure that we clear out the running job info and tell
+ # everyone the job is done, no matter what happened
+ job_info = self.jobs.pop(run_dir, None)
+ if job_info is not None:
+ job_info.finished.set()
need to validate the jhash, because could be a different job
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#1504 (review)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAlOaL1uREo7RQ9wT6Iblh6CboyNJ3nZks5vQpnVgaJpZM4aQdUI>
.
|
I see that lock_dict serialized rpc calls. I think _run_job is in a separate thread, which means that the comments about jhash in _run_job are valid. If an exception is thrown, there's no guarantee that the _run_job thread owns the process. I assume that cancel goes through finally, which is the code I'm concerned about. |
In a pytest test, the normal behavior of It looks like Would you rather we remove the chatty prints to keep the tests silent, keep them to make debugging easier, or figure out a way to get pytest's output capturing to work properly? |
errors on the runner daemon are now indicated by simply closing the response socket, which is (hopefully) difficult to mess up.
OK, I think I addressed most of your comments. The big thing left is that we still have race conditions around
Together, these will fix the race conditions on disk, and the race conditions in memory. (In particular, note that if we enforce that there are never two jobs running at the same time in the same dir, then all the issues around one job's shutdown overlapping with another job's startup and causing stuff to be scribbled on the wrong job_info object just goes away.) This will require some rearranging:
This is all doable, but it's complicated enough that I think we should bump it into SOW-3. |
I agree about the race condition and SOW-3. I'd like to discuss the solution, because we have to start decoupling based on the runner class (NERSC, subprocess, or Docker). A key factor for NERSC (coming up soon now) is that a "job" is really anything that touches run_dir. This is also key for the runner in general, because we want to run anything to any "codes" inside a separate container to ensure encapsulation and to allow sirepo to migrate to py3. Therefore, I think we need to generalize the job to "do something with this particular run_dir, which might already exist." |
We should have that as a goal without print, because pytest doesn't tell you what line number the print is on. The reason for pkdp, pkdlog, and pkdc is that I don't have to write anything to explain what's going on, because you can jump right to the line and see what the code is actually doing (as opposed to what the string says it is doing :).
There are numerous bugs around how pytest and pykern interact, e.g. pytest breaks capsys and a bunch of nulls problem that I have yet to write up. Anyway, I agree with the need for better logging. Let's leave in the pkdlogs as is for now, because you don't see any test output when running python setup.py tests, which is the most common mode. Having it chatty for an individual test run is tolerable until radiasoft/pykern#31 is fixed. |
I think we'll want to distinguish between the current kind of job, and the data extraction jobs, because they have fundamentally different interactions with the run dir lifecycle. For example, the current "status check" operation is basically a check of what kind of run dir we have; checking on the status of a particular data extraction job will need very different inputs and outputs.
Sounds good. |
So far this is very much an initial sketch, but it has the split
between the service and the API, the json-over-Unix-socket API, and a
very rudimentary attempt at redoing api_runCancel based on the
API (hidden behind a feature flag).
@robnagler: This is completely untested, because it depends on #1499 –
do you want me to take a stab at rearranging the db_dir config, or do
you want to do it? (I think I could get something working, but I feel
like without some hints it would probably be different than how you'd
do it :-).)