Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restarting the daemon excepts all jobs (aiida-core 1.6, python 3.7) #4648

Closed
giovannipizzi opened this issue Jan 9, 2021 · 57 comments · Fixed by #4792
Closed

Restarting the daemon excepts all jobs (aiida-core 1.6, python 3.7) #4648

giovannipizzi opened this issue Jan 9, 2021 · 57 comments · Fixed by #4792

Comments

@giovannipizzi
Copy link
Member

giovannipizzi commented Jan 9, 2021

Edit: this issue only occurs in python 3.7, so unitl the issue is fuly resolved, a "fix" is to upgrade to python 3.8


Steps to reproduce

Steps to reproduce the behavior:

  1. Submit a job to the daemon
  2. Start the daemon, if not already running
  3. Stop or restart the daemon

Expected behavior

The daemon stops in a reasonably short time, and the job is "frozen" and will safely continue when the daemon restarts.

Actual problematic behaviour

Instead, when stopping and/or restarting the deamon, I get a TIMEOUT.
Then, I get things like this from verdi process list:

  PK  Created    Process label    Process State    Process status
----  ---------  ---------------  ---------------  -------------------------------------
 184  20h ago    PwCalculation    ⨯ Excepted       Transport task update was interrupted
 189  1h ago     PwCalculation    ⨯ Excepted       Transport task submit was interrupted

and verdi process report shows this:

$ verdi process report 184
*** 184: None
*** Scheduler output: N/A
*** Scheduler errors: N/A
*** 3 LOG MESSAGES:
+-> ERROR at 2021-01-08 18:53:56.557646+00:00
 | Traceback (most recent call last):
 |   File "/home/pizzi/git/aiida-core/aiida/engine/utils.py", line 171, in exponential_backoff_retry
 |     result = await coro()
 |   File "/home/pizzi/git/aiida-core/aiida/engine/processes/calcjobs/tasks.py", line 177, in do_update
 |     job_info = await cancellable.with_interrupt(update_request)
 |   File "/home/pizzi/git/aiida-core/aiida/engine/utils.py", line 87, in with_interrupt
 |     result = await next(wait_iter)
 |   File "/usr/lib/python3.7/asyncio/tasks.py", line 560, in _wait_for_one
 |     return f.result()  # May raise f.exception().
 | concurrent.futures._base.CancelledError
+-> REPORT at 2021-01-08 18:53:36.744091+00:00
 | [184|PwCalculation|on_except]: Traceback (most recent call last):
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 1072, in step
 |     next_state = await self._run_task(self._state.execute)
 |   File "/home/pizzi/.virtualenvs/aiida-dev/lib/python3.7/site-packages/plumpy/processes.py", line 498, in _run_task
 |     result = await coro(*args, **kwargs)
 |   File "/home/pizzi/git/aiida-core/aiida/engine/processes/calcjobs/tasks.py", line 358, in execute
 |     job_done = await self._launch_task(task_update_job, node, self.process.runner.job_manager)
 |   File "/home/pizzi/git/aiida-core/aiida/engine/processes/calcjobs/tasks.py", line 394, in _launch_task
 |     result = await self._task
 |   File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
 |     future.result()
 | concurrent.futures._base.CancelledError
+-> ERROR at 2021-01-08 18:53:36.535279+00:00
 | Traceback (most recent call last):
 |   File "/home/pizzi/git/aiida-core/aiida/engine/utils.py", line 171, in exponential_backoff_retry
 |     result = await coro()
 |   File "/home/pizzi/git/aiida-core/aiida/engine/processes/calcjobs/tasks.py", line 177, in do_update
 |     job_info = await cancellable.with_interrupt(update_request)
 |   File "/home/pizzi/git/aiida-core/aiida/engine/utils.py", line 87, in with_interrupt
 |     result = await next(wait_iter)
 |   File "/usr/lib/python3.7/asyncio/tasks.py", line 556, in _wait_for_one
 |     f = await done.get()
 |   File "/usr/lib/python3.7/asyncio/queues.py", line 159, in get
 |     await getter
 |   File "/usr/lib/python3.7/asyncio/futures.py", line 263, in __await__
 |     yield self  # This tells Task to wait for completion.
 |   File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
 |     future.result()
 |   File "/usr/lib/python3.7/asyncio/futures.py", line 176, in result
 |     raise CancelledError
 | concurrent.futures._base.CancelledError

Further notes

I think this is actually the main problem that also manifested itself in #4595 and #4345

I think that many interruptions that should be "safe" are instead considered exceptions (here in the specific case of stopping the daemon, but also in other cases like SSH errors like maybe in #4345).

@muhrin @sphuber @unkcpz @chrisjsewell

@unkcpz
Copy link
Member

unkcpz commented Jan 10, 2021 via email

@giovannipizzi
Copy link
Member Author

Hi @unkcpz - yes, the calculation crashed in my case also if it the daemon was already running. If you run the daemon after submitting, instead, it picks up correctly the job.
So I agree with you this is something happening when trying to pause the job. My guess is that the exception might be thrown, but if it originates from a shutdown of the daemon it should not except.
Note also #4649, even if maybe it is unrelated?

@unkcpz
Copy link
Member

unkcpz commented Jan 13, 2021

hi @giovannipizzi I cannot reproduce this with a more simple example like:

from aiida.engine import run, submit

CODENAME_ADD = 'add@localhost'
ArithmeticAddCalculation = CalculationFactory('arithmetic.add')

code = load_code(CODENAME_ADD)
inputs = {
    'x': Int(1),
    'y': Int(2),
    'code': code,
}

submit(ArithmeticAddCalculation, **inputs)

I think it is all the same right? Moreover, I have run my sssp workflow in latest develop branch which also have PwCalculation and verdi daemon stop very often, but never experience your problem.

@giovannipizzi
Copy link
Member Author

Could you maybe try with an SSH computer/transport, where things are slower and there are non-zero safe intervals etc.? Maybe on a local transport they are all set to zero and the error occurs there.

In the meantime, I'll probably need to retry with the most recent version of develop.

@chrisjsewell chrisjsewell added this to the v1.6.0 milestone Jan 13, 2021
@mbercx
Copy link
Member

mbercx commented Jan 18, 2021

I've tried to reproduce this error as well, but don't seem to run into any issues. However, trying to restart the daemon, the operation times out. Here are the steps I execute:

  1. Submit a PwRelaxWorkChain. The code for this work chain is set up to run on a remote computer via SSH transport (Piz Daint).
  2. Check the calculation:
$ verdi process list
PK  Created    Process label     Process State    Process status
----  ---------  ----------------  ---------------  ---------------------------------------
1775  11s ago    PwRelaxWorkChain  ⏵ Waiting        Waiting for child processes: 1778
1778  10s ago    PwBaseWorkChain   ⏵ Waiting        Waiting for child processes: 1783
1783  9s ago     PwCalculation     ⏵ Waiting        Waiting for transport task: upload
  1. Restart the daemon:
$ verdi daemon restart --reset
Profile: dev2
Waiting for the daemon to shut down... TIMEOUT
Starting the daemon... TIMEOUT

So here the connection to the daemon times out. However, it is stopped:

$ verdi status
 ✔ config dir:  /home/mbercx/.virtualenvs/aiida-dev/.aiida
 ✔ profile:     On profile dev2
 ✔ repository:  /home/mbercx/.virtualenvs/aiida-dev/.aiida/repository/dev2
 ✔ postgres:    Connected as mbercx@localhost:5432
 ✔ rabbitmq:    Connected as amqp://guest:guest@127.0.0.1:5672?heartbeat=600
 ⏺ daemon:      The daemon is not running

At this point the PwCalculation is in the transport task: submit state:

$ verdi process list
  PK  Created    Process label     Process State    Process status
----  ---------  ----------------  ---------------  ---------------------------------------
1775  1m ago     PwRelaxWorkChain  ⏵ Waiting        Waiting for child processes: 1778
1778  1m ago     PwBaseWorkChain   ⏵ Waiting        Waiting for child processes: 1783
1783  1m ago     PwCalculation     ⏵ Waiting        Waiting for transport task: submit
  1. Start the daemon:
$ verdi daemon start
Starting the daemon... RUNNING

After waiting a bit, the daemon picks up the PwCalculation:

$ verdi process list
  PK  Created    Process label     Process State    Process status
----  ---------  ----------------  ---------------  ---------------------------------------
1775  13m ago    PwRelaxWorkChain  ⏵ Waiting        Waiting for child processes: 1778
1778  13m ago    PwBaseWorkChain   ⏵ Waiting        Waiting for child processes: 1783
1783  13m ago    PwCalculation     ⏵ Waiting        Monitoring scheduler: job state RUNNING

The log also looks fine:

$ verdi process report 1783
*** 1783: CalcJobState.WITHSCHEDULER, scheduler state: JobState.RUNNING
*** Scheduler output: N/A
*** Scheduler errors: N/A
*** 0 LOG MESSAGES

EDIT: I tried to reproduce the issue in Quantum Mobile, but no timeout there. After restarting my rabbitmq server on my work station the issue also seems to be fixed! I spoke too soon, seems I was (accidentally) in a different environment here. I keep on getting TIMEOUTs for my daemon when trying to restart (or stop) it. I tried reinstalling my Python environment but the problem persists. When I install aiida-core==1.5.2 however, the problem is no longer there. Should I open another issue for this?

@unkcpz
Copy link
Member

unkcpz commented Jan 21, 2021

@mbercx thanks for the report! I have the same timeout issue but just cannot assure the issue comes from the new version, thanks for compare and check!

Should I open another issue for this?

It doesn't seem necessary for now, I working on it. I think they have the same problem root, let's see if solve this one will fix the other ;)

@mbercx
Copy link
Member

mbercx commented Jan 21, 2021

It doesn't seem necessary for now, I working on it. I think they have the same problem root, let's see if solve this one will fix the other ;)

@ramirezfranciscof was having similar issues, so he opened #4667 where you can find some more information on the daemon log (the error there is the same for both of us). If the problems do have the same root, you can now fix two issues in one PR! 😅

@mbercx
Copy link
Member

mbercx commented Jan 21, 2021

Could you maybe try with an SSH computer/transport, where things are slower and there are non-zero safe intervals etc.? Maybe on a local transport they are all set to zero and the error occurs there.

I was able to reproduce the issue when increasing the safe-interval of the localhost computer:

$  verdi computer configure local localhost --safe-interval 10 -n

Then, stop the daemon and run the simple example you showed previously:

from aiida.engine import submit

ArithmeticAddCalculation = CalculationFactory('arithmetic.add')

inputs = {
    'x': Int(1),
    'y': Int(2),
    'code': load_code('add@localhost'),
}

submit(ArithmeticAddCalculation, **inputs)

Next, start the daemon, sleep for 2 seconds and restart it:

$ verdi daemon start; sleep 2; verdi daemon restart --reset
Starting the daemon... RUNNING
Profile: generic
Waiting for the daemon to shut down... OK
Starting the daemon... RUNNING
(dev-test) max@7e5d5794c8bd:~/codes/aiida-core$ verdi process list -ap1
  PK  Created    Process label             Process State    Process status
----  ---------  ------------------------  ---------------  -------------------------------------
 189  38s ago    ArithmeticAddCalculation  ⨯ Excepted       Transport task upload was interrupted

Total results: 1

Info: last time an entry changed state: 8s ago (at 11:27:25 on 2021-01-21)

Now the calcjob has been Excepted during the transport task. Here's the error log of the daemon:

01/21/2021 11:27:25 AM <2527> aiida.orm.nodes.process.calculation.calcjob.CalcJobNode: [REPORT] [189|ArithmeticAddCalc
ulation|on_except]: Traceback (most recent call last):
  File "/home/max/.virtualenvs/dev-test/lib/python3.7/site-packages/plumpy/processes.py", line 1171, in step
    next_state = await self._run_task(self._state.execute)
  File "/home/max/.virtualenvs/dev-test/lib/python3.7/site-packages/plumpy/processes.py", line 549, in _run_task
    result = await coro(*args, **kwargs)
  File "/home/max/codes/aiida-core/aiida/engine/processes/calcjobs/tasks.py", line 339, in execute
    skip_submit = await self._launch_task(task_upload_job, self.process, transport_queue)
  File "/home/max/codes/aiida-core/aiida/engine/processes/calcjobs/tasks.py", line 394, in _launch_task
    result = await self._task
  File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
    future.result()
concurrent.futures._base.CancelledError

01/21/2021 11:27:25 AM <2527> aiida.engine.transports: [ERROR] Exception whilst using transport:
Traceback (most recent call last):
  File "/home/max/codes/aiida-core/aiida/engine/transports.py", line 101, in request_transport
    yield transport_request.future
  File "/home/max/codes/aiida-core/aiida/engine/processes/calcjobs/tasks.py", line 70, in do_upload
    transport = await cancellable.with_interrupt(request)
  File "/home/max/codes/aiida-core/aiida/engine/utils.py", line 87, in with_interrupt
    result = await next(wait_iter)
  File "/usr/lib/python3.7/asyncio/tasks.py", line 556, in _wait_for_one
    f = await done.get()
  File "/usr/lib/python3.7/asyncio/queues.py", line 159, in get
    await getter
  File "/usr/lib/python3.7/asyncio/futures.py", line 263, in __await__
    yield self  # This tells Task to wait for completion.
  File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
    future.result()
  File "/usr/lib/python3.7/asyncio/futures.py", line 176, in result
    raise CancelledError
concurrent.futures._base.CancelledError

01/21/2021 11:27:28 AM <2621> kiwipy.rmq.tasks: [ERROR] Exception occurred while processing task.
Traceback (most recent call last):
  File "/home/max/.virtualenvs/dev-test/lib/python3.7/site-packages/kiwipy/rmq/tasks.py", line 166, in _on_task
    result = await result
  File "/usr/lib/python3.7/asyncio/futures.py", line 263, in __await__
    yield self  # This tells Task to wait for completion.
  File "/usr/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
    future.result()
  File "/usr/lib/python3.7/asyncio/futures.py", line 181, in result
    raise self._exception
  File "/home/max/.virtualenvs/dev-test/lib/python3.7/site-packages/kiwipy/rmq/threadcomms.py", line 252, in done
    result = kiwi_future.result()
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 428, in result
    return self.__get_result()
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/home/max/.virtualenvs/dev-test/lib/python3.7/site-packages/kiwipy/futures.py", line 54, in capture_exceptions
    yield
  File "/home/max/.virtualenvs/dev-test/lib/python3.7/site-packages/plumpy/communications.py", line 48, in on_done
    result = plum_future.result()
  File "/usr/lib/python3.7/asyncio/futures.py", line 181, in result
    raise self._exception
  File "/home/max/.virtualenvs/dev-test/lib/python3.7/site-packages/kiwipy/futures.py", line 54, in capture_exceptions
    yield
  File "/home/max/.virtualenvs/dev-test/lib/python3.7/site-packages/plumpy/futures.py", line 73, in run_task
    res = await coro()
  File "/home/max/.virtualenvs/dev-test/lib/python3.7/site-packages/plumpy/process_comms.py", line 539, in __call__
    return await self._continue(communicator, **task.get(TASK_ARGS, {}))
  File "/home/max/codes/aiida-core/aiida/manage/external/rmq.py", line 203, in _continue
    return future.result()
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 428, in result
    return self.__get_result()
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
aiida.engine.exceptions.PastException: concurrent.futures._base.CancelledError

Moreover, very funky stuff is happening after this. It tells me the daemon is not running (without me stopping it):

$ verdi daemon status
Profile: generic
The daemon is not running

But it actually is! When submitting another ArithmeticAddCalculation, the daemon is definitely active:

$ verdi process list -ap1
  PK  Created    Process label             Process State    Process status
----  ---------  ------------------------  ---------------  -------------------------------------
 189  5m ago     ArithmeticAddCalculation  ⨯ Excepted       Transport task upload was interrupted
 192  6s ago     ArithmeticAddCalculation  ⏵ Waiting        Waiting for transport task: upload

Total results: 2

Info: last time an entry changed state: 6s ago (at 11:32:28 on 2021-01-21)
Warning: the daemon is not running
(dev-test) max@7e5d5794c8bd:~/.aiida/daemon/log$ verdi process list -ap1
  PK  Created    Process label             Process State    Process status
----  ---------  ------------------------  ---------------  -------------------------------------
 189  5m ago     ArithmeticAddCalculation  ⨯ Excepted       Transport task upload was interrupted
 192  13s ago    ArithmeticAddCalculation  ⏵ Waiting        Waiting for transport task: submit

Total results: 2

Info: last time an entry changed state: 2s ago (at 11:32:38 on 2021-01-21)
Warning: the daemon is not running

Finally, note that here restarting the daemon did not encounter any TIMEOUT problems. Looking at the log, the error also seems different. I was able to reproduce this TIMEOUT problem in QM as well, but then using a PwBaseWorkChain. I'll post the steps to reproduce in @ramirezfranciscof's issue (#4667).

@mbercx
Copy link
Member

mbercx commented Jan 28, 2021

Note: the TIMEOUT problem, when reproduced in the QM environment, also had a similar error trace as in my previous message. So both issues are related to the same root cause, as @unkcpz thought.

You can also get TIMEOUT issues when running using Python 3.6, however, as discussed in #4667. Of course, from aiida-core v1.6, Python 3.6 is no longer supported.

@chrisjsewell
Copy link
Member

@giovannipizzi/@mbercx I cannot reproduce the issue in https://github.com/aiidateam/aiida-integration-tests:

$ verdi daemon stop
$ aiida-sleep calc -n 2 --submit -p 100000
$ verdi daemon start
$ verdi daemon stop
$ verdi process list
1776  1m ago     SleepCalculation  ⏵ Waiting        Waiting for transport task: upload
1779  58s ago    SleepCalculation  ⏵ Waiting        Waiting for transport task: upload
$ verdi daemon start
$ verdi daemon stop
$ verdi process list
1776  1m ago     SleepCalculation  ⏵ Waiting        Waiting for transport task: submit
1779  1m ago     SleepCalculation  ⏵ Waiting        Waiting for transport task: submit

I feel it may be related to #4692, but then again I couldn't reproduce it before this commit either

As per #4667, can you confirm that you were not using python 3.6

Then can you try to reproduce on https://github.com/aiidateam/aiida-integration-tests.
If it cannot be reproduced there then I can't fix it.

@chrisjsewell
Copy link
Member

yeh and just to double-check added the add@local code to aiidateam/aiida-integration-tests and run it exactly as #4648 (comment), and it was fine:

$ verdi process list -ap1
  PK  Created    Process label             Process State    Process status
----  ---------  ------------------------  ---------------  -------------------------------------
8517  42s ago    ArithmeticAddCalculation  ⏵ Waiting         Waiting for transport task: upload

so @mbercx, yeh give it a go again with the latest develop and if there is still an issue, perhaps try it on aiidateam/aiida-integration-tests and also compare your local setup to thats, e.g. what version of RabbitMQ you are using

@mjclarke94
Copy link
Contributor

Not sure if the same issue or if needs a new issue, but I think I am seeing a similar problem using aiida-lammps, latest aiida-core@develop.

A number of jobs seem to not perform the retrieve task correctly if a daemon restart occurs:

   PK  Created    Process label       Process State     Process status
-----  ---------  ------------------  ----------------  ---------------------------------------
11619  21h ago    MdMultiCalculation  ⏹ Finished [0]
11622  21h ago    MdMultiCalculation  ⏹ Finished [0]
11625  21h ago    MdMultiCalculation  ⏹ Finished [0]
11628  21h ago    MdMultiCalculation  ⏹ Finished [203]
11631  21h ago    MdMultiCalculation  ⏹ Finished [203]
11634  21h ago    MdMultiCalculation  ⏹ Finished [203]
...
11870  21h ago    MdMultiCalculation  ⏹ Finished [0]
11873  21h ago    MdMultiCalculation  ⏹ Finished [0]
11876  21h ago    MdMultiCalculation  ⏵ Waiting         Monitoring scheduler: job state RUNNING
11879  21h ago    MdMultiCalculation  ⏵ Waiting         Waiting for transport task: retrieve

Checking on the remote machine, the files corresponding to the error aren't missing so it implies an issue with the transport rather than the calculation itself. Different behaviour in that it isn't excepting but instead just failing to retrieve/parse some files.

@chrisjsewell
Copy link
Member

chrisjsewell commented Feb 25, 2021

@mjclarke94 good to hear you are using aiida-lammps 😄 what do you get if you do verdi process report 11628 also could you find where your .aiida folder is and have a look at the content of .aiida/daemon/aiida-profilename.log

@mjclarke94
Copy link
Contributor

@chrisjsewell Haha, that and aiida-gulp. I think you're basically my unwitting supervisor at this point.

verdi process report 11628 gives a bunch of stuff from the scheduler (it's very verbose even if everything is fine...) and then

*** 1 LOG MESSAGES:
+-> WARNING at 2021-02-25 12:28:40.944371+00:00
 | output parser returned exit code<203>: the trajectory output file was not found

However the file definitely is there:

❯ verdi calcjob gotocomputer 11628
Info: going to the remote work directory...

(chem) [mmm0565@login10 2438-5099-432d-9bc2-fe4ebdfbeb3b]$ ls
_aiidasubmit.sh  cell_transform.npy  input.data  input.in  log.lammps  main_nvt_simulation-trajectory.lammpstrj  _scheduler-stderr.txt  _scheduler-stdout.txt

(chem) [mmm0565@login10 2438-5099-432d-9bc2-fe4ebdfbeb3b]$ head main_nvt_simulation-trajectory.lammpstrj
ITEM: TIMESTEP
100000
ITEM: NUMBER OF ATOMS
11451
ITEM: BOX BOUNDS xy xz yz pp pp pp
-4.1420710587764920e-01 4.1216942282878172e+01 0.0000000000000000e+00
-3.8280899590781620e-01 3.8800338155907703e+01 0.0000000000000000e+00
-9.8744374623459463e-01 8.9907093575934269e+01 0.0000000000000000e+00
ITEM: ATOMS element x y z q id type vx vy vz
Li -0.145018 -0.0701078 2.38145 1 1 1 10.6792 -14.403 21.4334

Assuming I've hit on the relevant bit of the log:


 The above exception was the direct cause of the following exception:

 Traceback (most recent call last):
   File "/Users/matt/.pyenv/versions/3.9.0/bin/verdi", line 8, in <module>
     sys.exit(verdi())
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/click/core.py", line 829, in __call__
     return self.main(*args, **kwargs)
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/click/core.py", line 782, in main
     rv = self.invoke(ctx)
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/click/core.py", line 1259, in invoke
     return _process_result(sub_ctx.command.invoke(sub_ctx))
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/click/core.py", line 1259, in invoke
     return _process_result(sub_ctx.command.invoke(sub_ctx))
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/click/core.py", line 1066, in invoke
     return ctx.invoke(self.callback, **ctx.params)
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/click/core.py", line 610, in invoke
     return callback(*args, **kwargs)
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/aiida/cmdline/utils/decorators.py", line 65, in wrapper
     return wrapped(*args, **kwargs)
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/aiida/cmdline/commands/cmd_devel.py", line 74, in devel_run_daemon
     start_daemon()
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/aiida/engine/daemon/runner.py", line 51, in start_daemon
     runner = manager.create_daemon_runner()
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/aiida/manage/manager.py", line 302, in create_daemon_runner
     runner = self.create_runner(rmq_submit=True, loop=loop)
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/aiida/manage/manager.py", line 281, in create_runner
     settings['communicator'] = self.get_communicator()
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/aiida/manage/manager.py", line 166, in get_communicator
     self._communicator = self.create_communicator()
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/aiida/manage/manager.py", line 200, in create_communicator
     return kiwipy.rmq.RmqThreadCommunicator.connect(
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py", line 51, in connect
     comm = cls(
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/kiwipy/rmq/threadcomms.py", line 108, in __init__
     self._communicator = self._loop_scheduler.await_(
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/pytray/aiothreads.py", line 155, in await_
     return self.await_submit(awaitable).result(timeout=self.task_timeout)
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/concurrent/futures/_base.py", line 440, in result
     return self.__get_result()
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/concurrent/futures/_base.py", line 389, in __get_result
     raise self._exception
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/pytray/aiothreads.py", line 35, in done
     result = done_future.result()
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/pytray/aiothreads.py", line 169, in proxy
     return await awaitable
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/kiwipy/rmq/communicator.py", line 551, in async_connect
     connection = await connection_factory(**connection_params)
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/aio_pika/robust_connection.py", line 271, in connect_robust
     return await connect(
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/aio_pika/connection.py", line 332, in connect
     await connection.connect(
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/aio_pika/robust_connection.py", line 127, in connect
     result = await super().connect(
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/aio_pika/connection.py", line 120, in connect
     self.connection = await asyncio.wait_for(
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/asyncio/tasks.py", line 440, in wait_for
     return await fut
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/aio_pika/connection.py", line 105, in _make_connection
     connection = await aiormq.connect(self.url, **kwargs)
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/aiormq/connection.py", line 542, in connect
     await connection.connect(client_properties or {})
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/aiormq/base.py", line 168, in wrap
     return await self.create_task(func(self, *args, **kwargs))
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/aiormq/base.py", line 25, in __inner
     return await self.task
   File "/Users/matt/.pyenv/versions/3.9.0/lib/python3.9/site-packages/aiormq/connection.py", line 232, in connect
     raise ConnectionError(*e.args) from e
 ConnectionError: [Errno 61] Connect call failed ('127.0.0.1', 5672)
 02/25/2021 12:28:27 PM <25137> aiida.engine.processes.calcjobs.tasks: [WARNING] CalcJob<11628> already marked as PARSING, skipping task_retrieve_job
 02/25/2021 12:28:27 PM <25137> aiida.engine.processes.calcjobs.tasks: [WARNING] CalcJob<11631> already marked as PARSING, skipping task_retrieve_job
 02/25/2021 12:28:27 PM <25137> aiida.engine.processes.calcjobs.tasks: [WARNING] CalcJob<11634> already marked as PARSING, skipping task_retrieve_job
 02/25/2021 12:28:27 PM <25137> aiida.engine.processes.calcjobs.tasks: [WARNING] CalcJob<11637> already marked as PARSING, skipping task_retrieve_job
 02/25/2021 12:28:27 PM <25137> aiida.engine.processes.calcjobs.tasks: [WARNING] CalcJob<11640> already marked as PARSING, skipping task_retrieve_job
 02/25/2021 12:28:27 PM <25137> aiida.engine.processes.calcjobs.tasks: [WARNING] CalcJob<11643> already marked as PARSING, skipping task_retrieve_job
 02/25/2021 12:28:40 PM <25137> aiida.orm.nodes.process.calculation.calcjob.CalcJobNode: [WARNING] output parser returned exit code<203>: the trajectory output file was not found
 02/25/2021 12:28:41 PM <25137> aiida.orm.nodes.process.calculation.calcjob.CalcJobNode: [WARNING] output parser returned exit code<203>: the trajectory output file was not found
 02/25/2021 12:28:41 PM <25137> aiida.orm.nodes.process.calculation.calcjob.CalcJobNode: [WARNING] output parser returned exit code<203>: the trajectory output file was not found
 02/25/2021 12:28:41 PM <25137> aiida.orm.nodes.process.calculation.calcjob.CalcJobNode: [WARNING] output parser returned exit code<203>: the trajectory output file was not found
 02/25/2021 12:28:41 PM <25137> aiida.orm.nodes.process.calculation.calcjob.CalcJobNode: [WARNING] output parser returned exit code<203>: the trajectory output file was not found
 02/25/2021 12:28:43 PM <25137> aiida.orm.nodes.process.calculation.calcjob.CalcJobNode: [WARNING] output parser returned exit code<203>: the trajectory output file was not found

So lots of messages relating to jobs already being marked as parsing and then others failing to parse.

@chrisjsewell
Copy link
Member

😆

so I don't think the ConnectionError is the cause, that will have just killed a daemon worker before it starts.
But yes I have also noticed a lot of these warnings: already marked as XXXXXX, skipping XXXX_job in the log, and certainly in this case it seems to imply the process has been prematurely moved to the PARSING phase and skipped the correct file retrieval.

@sphuber would you expect to see such warnings in the log?

@sphuber
Copy link
Contributor

sphuber commented Feb 26, 2021

The parser checks the contents of the retrieved node for the trajectory file, not the remote_folder node, so it is not wholly surprising that you get that parser warning yet it still seems to be on the remote. What this rather indicates is that the file failed to be retrieved. Could you check the contents of the retrieved output node of one of those calculations with a 203? The warning CalcJob<11628> already marked as PARSING, skipping task_retrieve_job is emitted when the retrieve transport task is called, but the state of the CalcJob is already in parsing. This is a safety to prevent the task from happening twice. The parsing state is set at the end of the retrieve task at which point the retrieved node should have been fully retrieved and attached to the CalcJobNode, which is why the retrieve task should be skippable in principle.

@mjclarke94
Copy link
Contributor

In [1]: node_with_error = load_node(11628)
   ...: node_with_error.outputs.retrieved.list_object_names()
Out[1]:
['_scheduler-stderr.txt',
 '_scheduler-stdout.txt',
 'cell_transform.npy',
 'log.lammps']

In [2]: node_without_error = load_node(11763)
   ...:    ...: node_without_error.outputs.retrieved.list_object_names()
Out[2]:
['_scheduler-stderr.txt',
 '_scheduler-stdout.txt',
 'cell_transform.npy',
 'log.lammps']

No trajectory file in either a 203 or 0 code process but I think the trajectory file is only retrieved temporarily and a LammpsTrajectory object is used to store it instead.

@chrisjsewell
Copy link
Member

ah ok thanks @mjclarke94

The parsing state is set at the end of the retrieve task at which point the retrieved node should have been fully retrieved and attached to the CalcJobNode, which is why the retrieve task should be skippable in principle.

@sphuber perhaps then this safety mechanism does not account for temporary retrieved files?

@sphuber
Copy link
Contributor

sphuber commented Feb 26, 2021

Ah, that must mean the plugin uses the retrieved temporary feature such that the trajectory can be parsed and stored, but the original file is then discarded. This makes sense since these files typically get big and you don't want the content duplicated. Do the 203's only seem to occur after a daemon restart? Or have you also seen this problem occur when the daemon is running normally?

@sphuber
Copy link
Contributor

sphuber commented Feb 26, 2021

Ah, that must mean the plugin uses the retrieved temporary feature such that the trajectory can be parsed and stored, but the original file is then discarded. This makes sense since these files typically get big and you don't want the content duplicated. Do the 203's only seem to occur after a daemon restart? Or have you also seen this problem occur when the daemon is running normally?

@sphuber perhaps then this safety mechanism does not account for temporary retrieved files?

It should, but since he is on develop and I think there has been quite some refactoring/changes in the transport tasks, this may have introduced a bug. Since the temporary retrieved functionality is difficult to test, especially when the bug involves a subtle task restart, it is quite possible this was missed by the tests.

@mjclarke94
Copy link
Contributor

mjclarke94 commented Feb 26, 2021

@sphuber I haven't seen the issue outside of a daemon restart, but might be conflating it with another issue.

Here's a less compact verdi process list output. The large chunk of 203's all correlate with a daemon restart, and all jobs were created in a for loop so everything barring minor tweaks to the inputs is the same.

11392  1D ago     OptimizeCalculation  ⏹ Finished [0]
11394  1D ago     OptimizeCalculation  ⏹ Finished [0]
11396  1D ago     OptimizeCalculation  ⏹ Finished [0]
11398  1D ago     OptimizeCalculation  ⏹ Finished [0]
11400  1D ago     OptimizeCalculation  ⏹ Finished [0]
11402  1D ago     OptimizeCalculation  ⏹ Finished [0]
11404  1D ago     OptimizeCalculation  ⏹ Finished [0]
11406  1D ago     OptimizeCalculation  ⏹ Finished [0]
11408  1D ago     OptimizeCalculation  ⏹ Finished [0]
11410  1D ago     OptimizeCalculation  ⏹ Finished [0]
11412  1D ago     OptimizeCalculation  ⏹ Finished [0]
11414  1D ago     OptimizeCalculation  ⏹ Finished [0]
11416  1D ago     OptimizeCalculation  ⏹ Finished [0]
11418  1D ago     OptimizeCalculation  ⏹ Finished [0]
11420  1D ago     OptimizeCalculation  ⏹ Finished [0]
11422  1D ago     OptimizeCalculation  ⏹ Finished [0]
11424  1D ago     OptimizeCalculation  ⏹ Finished [0]
11607  1D ago     MdMultiCalculation   ⏹ Finished [0]
11610  1D ago     MdMultiCalculation   ⏹ Finished [0]
11613  1D ago     MdMultiCalculation   ⏹ Finished [0]
11616  1D ago     MdMultiCalculation   ⏹ Finished [0]
11619  1D ago     MdMultiCalculation   ⏹ Finished [0]
11622  1D ago     MdMultiCalculation   ⏹ Finished [0]
11625  1D ago     MdMultiCalculation   ⏹ Finished [0]
11628  1D ago     MdMultiCalculation   ⏹ Finished [203]
11631  1D ago     MdMultiCalculation   ⏹ Finished [203]
11634  1D ago     MdMultiCalculation   ⏹ Finished [203]
11637  1D ago     MdMultiCalculation   ⏹ Finished [203]
11640  1D ago     MdMultiCalculation   ⏹ Finished [203]
11643  1D ago     MdMultiCalculation   ⏹ Finished [203]
11646  1D ago     MdMultiCalculation   ⏹ Finished [0]
11649  1D ago     MdMultiCalculation   ⏹ Finished [203]
11652  1D ago     MdMultiCalculation   ⏹ Finished [203]
11655  1D ago     MdMultiCalculation   ⏹ Finished [203]
11658  1D ago     MdMultiCalculation   ⏹ Finished [203]
11661  1D ago     MdMultiCalculation   ⏹ Finished [203]
11664  1D ago     MdMultiCalculation   ⏹ Finished [203]
11667  1D ago     MdMultiCalculation   ⏹ Finished [203]
11670  1D ago     MdMultiCalculation   ⏹ Finished [203]
11673  1D ago     MdMultiCalculation   ⏹ Finished [203]
11676  1D ago     MdMultiCalculation   ⏹ Finished [203]
11679  1D ago     MdMultiCalculation   ⏹ Finished [203]
11682  1D ago     MdMultiCalculation   ⏹ Finished [203]
11685  1D ago     MdMultiCalculation   ⏹ Finished [203]
11688  1D ago     MdMultiCalculation   ⏹ Finished [203]
11691  1D ago     MdMultiCalculation   ⏹ Finished [203]
11694  1D ago     MdMultiCalculation   ⏹ Finished [203]
11697  1D ago     MdMultiCalculation   ⏹ Finished [203]
11700  1D ago     MdMultiCalculation   ⏹ Finished [203]
11703  1D ago     MdMultiCalculation   ⏹ Finished [203]
11706  1D ago     MdMultiCalculation   ⏹ Finished [203]
11709  1D ago     MdMultiCalculation   ⏹ Finished [203]
11712  1D ago     MdMultiCalculation   ⏹ Finished [203]
11715  1D ago     MdMultiCalculation   ⏹ Finished [203]
11718  1D ago     MdMultiCalculation   ⏹ Finished [203]
11721  1D ago     MdMultiCalculation   ⏹ Finished [203]
11724  1D ago     MdMultiCalculation   ⏹ Finished [203]
11727  1D ago     MdMultiCalculation   ⏹ Finished [203]
11730  1D ago     MdMultiCalculation   ⏹ Finished [203]
11733  1D ago     MdMultiCalculation   ⏹ Finished [203]
11736  1D ago     MdMultiCalculation   ⏹ Finished [203]
11739  1D ago     MdMultiCalculation   ⏹ Finished [203]
11742  1D ago     MdMultiCalculation   ⏹ Finished [203]
11745  1D ago     MdMultiCalculation   ⏹ Finished [203]
11748  1D ago     MdMultiCalculation   ⏹ Finished [203]
11751  1D ago     MdMultiCalculation   ⏹ Finished [203]
11754  1D ago     MdMultiCalculation   ⏹ Finished [203]
11757  1D ago     MdMultiCalculation   ⏹ Finished [203]
11760  1D ago     MdMultiCalculation   ⏹ Finished [0]
11763  1D ago     MdMultiCalculation   ⏹ Finished [0]
11766  1D ago     MdMultiCalculation   ⏹ Finished [0]
11769  1D ago     MdMultiCalculation   ⏹ Finished [0]
11772  1D ago     MdMultiCalculation   ⏹ Finished [0]
11775  1D ago     MdMultiCalculation   ⏹ Finished [0]
11778  1D ago     MdMultiCalculation   ⏹ Finished [0]
11781  1D ago     MdMultiCalculation   ⏹ Finished [0]
11784  1D ago     MdMultiCalculation   ⏹ Finished [0]
11787  1D ago     MdMultiCalculation   ⏹ Finished [0]
11790  1D ago     MdMultiCalculation   ⏹ Finished [0]
11793  1D ago     MdMultiCalculation   ⏹ Finished [0]
11796  1D ago     MdMultiCalculation   ⏹ Finished [0]
11799  1D ago     MdMultiCalculation   ⏹ Finished [0]
11802  1D ago     MdMultiCalculation   ⏹ Finished [0]
11806  1D ago     MdMultiCalculation   ⏹ Finished [0]
11811  1D ago     MdMultiCalculation   ⏹ Finished [0]
11815  1D ago     MdMultiCalculation   ⏹ Finished [0]
11818  1D ago     MdMultiCalculation   ⏹ Finished [0]
11821  1D ago     MdMultiCalculation   ⏹ Finished [0]
11824  1D ago     MdMultiCalculation   ⏹ Finished [0]

@chrisjsewell
Copy link
Member

chrisjsewell commented Feb 26, 2021

@sphuber it seems here:

elif self.data == RETRIEVE_COMMAND:
node.set_process_status(process_status)
# Create a temporary folder that has to be deleted by JobProcess.retrieved after successful parsing
temp_folder = tempfile.mkdtemp()
await self._launch_task(task_retrieve_job, node, transport_queue, temp_folder)
result = self.parse(temp_folder)

the temp_folder is created fresh everytime you reach this part of the code. So if the daemon worker is stopped during the parsing, and a new daemon worker picks up the task when the daemon is restarted, it will get a new temp_folder (and also the old temp_folder will not be removed).

so perhaps the temp_folder location should be stored on the node, e.g. something like

temp_folder = node.get_retrieved_temp_folder()
if not temp_folder or not os.path.exists(temp_folder):
    temp_folder = tempfile.mkdtemp()
    node.set_retrieved_temp_folder(temp_folder)

@sphuber
Copy link
Contributor

sphuber commented Feb 26, 2021

This definitely used to work at some point and without storing the temporary folder on the node. Instead after the restart it would generate a new temp folder and retrieve again. This is because you cannot guarantee that after the restart the original temp folder still exists so you have to accept the inefficiency of retrieving again. Could you please split this off in a new issue, since it has a different cause than the OP. I am convinced this used to work in previous versions, so I was trying to find the commit that introduced it, but haven't found it so far. Cannot look too much further into it now though unfortunately.

@chrisjsewell
Copy link
Member

chrisjsewell commented Feb 26, 2021

I am convinced this used to work in previous versions, so I was trying to find the commit that introduced it, but haven't found it so far.

nah you're just trying to find some else to blame 😜

Could you please split this off in a new issue, since it has a different cause than the OP.

indeed

@sphuber
Copy link
Contributor

sphuber commented Feb 26, 2021

nah you're just trying to find some else to blame

Hey, I didn't say it was the most recent version that broke it nor that the commit I was looking for wouldn't turn out to be mine ;)

@chrisjsewell chrisjsewell changed the title Restarting the daemon in the new asyncio implementation excepts all jobs Restarting the daemon excepts all jobs (aiida-core v1.6, python 3.7) Mar 4, 2021
@chrisjsewell chrisjsewell changed the title Restarting the daemon excepts all jobs (aiida-core v1.6, python 3.7) Restarting the daemon excepts all jobs (aiida-core 1.6, python 3.7) Mar 4, 2021
@chrisjsewell
Copy link
Member

chrisjsewell commented Mar 4, 2021

Looking at the changes in 3.7 -> 3.8, one change that seems like it has the possibility to cause this is:

https://docs.python.org/3/whatsnew/3.8.html#asyncio

The exception asyncio.CancelledError now inherits from BaseException rather than Exception and no longer inherits from concurrent.futures.CancelledError.

perhaps somewhere a CancelledError is being incorrectly caught/not caught

@ltalirz
Copy link
Member

ltalirz commented Mar 4, 2021

Thanks @chrisjsewell , this does seem likely.

I'm looking at this code in aiida-core that is responsible for the exception under python 3.7.9

        except (plumpy.process_states.Interruption, plumpy.futures.CancelledError):
            node.set_process_status(f'Transport task {command} was interrupted')
            raise

The plumpy.futures.CancelledError actually goes over kiwipy.futures.CancelledError to concurrent.futures.CancelledError.

As mentioned here, as of python 3.8 this is a mistake - it should be using asyncio.CancelledError.

However, I fear that "fixing" this will actually make the exception happen in python 3.8 as well ;-)
The more interesting question is why the code works when this exception is ignored - and whether it should actually be raised.

@chrisjsewell
Copy link
Member

so if you add (plumpy.process_states.Interruption, plumpy.futures.CancelledError, asyncio.CancelledError) here, can we replicate the issue in python 3.8?

@chrisjsewell
Copy link
Member

chrisjsewell commented Mar 4, 2021

in that same file (aiida/engine/processes/calcjobs/tasks.py) there is also for each task:

    except (plumpy.futures.CancelledError, plumpy.process_states.Interruption):  # pylint: disable=try-except-raise
        raise
    except Exception:

which again would appear to have divergent behaviour between python 3.7 and 3.8

@ltalirz
Copy link
Member

ltalirz commented Mar 4, 2021

I've replaced every instance of plumpy.futures.CancelledError by plumpy.futures.CancelledError, asyncio.CancelledError in aiida/engine/processes/calcjobs/tasks.py (and import asyncio).

Weirdly, though, this does not seem to lead to the exception in python 3.8 - the job still goes through fine.
(and it does not fix python 3.7 either)

@chrisjsewell
Copy link
Member

perhaps now replace all except Exception: with except (Exception, asyncio.CancelledError): 😬

@ltalirz
Copy link
Member

ltalirz commented Mar 4, 2021

perhaps now replace all except Exception: with except (Exception, asyncio.CancelledError): 😬

I checked; this also doesn't change anything.
It seems to me that the difference is not in how the exception is being handled in tasks.py but that it is thrown in python 3.7 and not thrown (or caught somewhere else) in python 3.8

@chrisjsewell
Copy link
Member

chrisjsewell commented Mar 4, 2021

Just to clarify, searching for CancelledError in the packages:

aiida:

9 results - 1 file

aiida/engine/processes/calcjobs/tasks.py:
   92      try:
   93          logger.info(f'scheduled request to upload CalcJob<{node.pk}>')
   94:         ignore_exceptions = (plumpy.futures.CancelledError, PreSubmitException, plumpy.process_states.Interruption)
   95          skip_submit = await exponential_backoff_retry(
   96              do_upload, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions

   98      except PreSubmitException:
   99          raise
  100:     except (plumpy.futures.CancelledError, plumpy.process_states.Interruption):
  101          raise
  102      except Exception:

  140      try:
  141          logger.info(f'scheduled request to submit CalcJob<{node.pk}>')
  142:         ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)
  143          result = await exponential_backoff_retry(
  144              do_submit, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
  145          )
  146:     except (plumpy.futures.CancelledError, plumpy.process_states.Interruption):  # pylint: disable=try-except-raise
  147          raise
  148      except Exception:

  199      try:
  200          logger.info(f'scheduled request to update CalcJob<{node.pk}>')
  201:         ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)
  202          job_done = await exponential_backoff_retry(
  203              do_update, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
  204          )
  205:     except (plumpy.futures.CancelledError, plumpy.process_states.Interruption):  # pylint: disable=try-except-raise
  206          raise
  207      except Exception:

  265      try:
  266          logger.info(f'scheduled request to retrieve CalcJob<{node.pk}>')
  267:         ignore_exceptions = (plumpy.futures.CancelledError, plumpy.process_states.Interruption)
  268          result = await exponential_backoff_retry(
  269              do_retrieve, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
  270          )
  271:     except (plumpy.futures.CancelledError, plumpy.process_states.Interruption):  # pylint: disable=try-except-raise
  272          raise
  273      except Exception:

  407                  logger.warning(f'killed CalcJob<{node.pk}> but async future was None')
  408              raise
  409:         except (plumpy.process_states.Interruption, plumpy.futures.CancelledError):
  410              node.set_process_status(f'Transport task {command} was interrupted')
  411              raise

plumpy:

3 results - 1 file

plumpy/futures.py:
   8  import kiwipy
   9  
  10: __all__ = ['Future', 'gather', 'chain', 'copy_future', 'CancelledError', 'create_task']
  11  
  12: CancelledError = kiwipy.CancelledError
  13  
  14  

kiwipy:

6 results - 4 files

kiwipy/futures.py:
   4  import logging
   5  
   6: __all__ = 'Future', 'chain', 'copy_future', 'CancelledError', 'capture_exceptions', 'wait', 'as_completed'
   7  
   8  _LOGGER = logging.getLogger(__name__)
   9  
  10: CancelledError = concurrent.futures.CancelledError
  11  wait = concurrent.futures.wait  # pylint: disable=invalid-name
  12  as_completed = concurrent.futures.as_completed  # pylint: disable=invalid-name

kiwipy/rmq/communicator.py:
  252                  await self._send_response(reply_to, correlation_id, utils.pending_response())
  253                  future = await future
  254:         except kiwipy.CancelledError as exc:
  255              # Send out a cancelled response
  256              await self._send_response(reply_to, correlation_id, utils.cancelled_response(str(exc)))

kiwipy/rmq/tasks.py:
  168                      # Task was rejected by this subscriber, keep trying
  169                      continue
  170:                 except kiwipy.CancelledError:
  171                      # The subscriber has cancelled their processing of the task
  172                      outcome.cancel()

kiwipy/rmq/threadcomms.py:
  259              try:
  260                  result = kiwi_future.result()
  261:             except concurrent.futures.CancelledError:
  262                  self._loop.call_soon_threadsafe(aio_future.cancel)
  263              except Exception as exc:  # pylint: disable=broad-except

@chrisjsewell
Copy link
Member

chrisjsewell commented Mar 4, 2021

Another potential place, is in aiida/engine/utils.py:

def interruptable_task(
    coro: Callable[[InterruptableFuture], Awaitable[Any]],
    loop: Optional[asyncio.AbstractEventLoop] = None
) -> InterruptableFuture:
    """
    Turn the given coroutine into an interruptable task by turning it into an InterruptableFuture and returning it.

    :param coro: the coroutine that should be made interruptable with object of InterutableFuture as last paramenter
    :param loop: the event loop in which to run the coroutine, by default uses asyncio.get_event_loop()
    :return: an InterruptableFuture
    """

    loop = loop or asyncio.get_event_loop()
    future = InterruptableFuture()

    async def execute_coroutine():
        """Coroutine that wraps the original coroutine and sets it result on the future only if not already set."""
        try:
            result = await coro(future)
        except Exception as exception:  # pylint: disable=broad-except
            if not future.done():
                future.set_exception(exception)
            else:
                LOGGER.warning(
                    'Interruptable future set to %s before its coro %s is done. %s', future.result(), coro.__name__,
                    str(exception)
                )
        else:
            # If the future has not been set elsewhere, i.e. by the interrupt call, by the time that the coroutine
            # is executed, set the future's result to the result of the coroutine
            if not future.done():
                future.set_result(result)

    loop.create_task(execute_coroutine())

    return future

whereby an asyncio.CancelledError will be caught by except Exception in python 3.7 but not 3.8

@ltalirz
Copy link
Member

ltalirz commented Mar 4, 2021

Another potential place, is in aiida/engine/utils.py:

I've checked, and adding the asyncio.CancelledError here does not change anything for me under python 3.8.

I'm wondering whether the change in behavior of the CancelledError is perhaps a red herring... and whether we perhaps rather need to look into concurrent?
Given that this is the stack trace:

Traceback (most recent call last):
  File "/root/miniconda3/envs/aiida3710/lib/python3.7/site-packages/kiwipy/rmq/tasks.py", line 166, in _on_task
    result = await result
  File "/root/miniconda3/envs/aiida3710/lib/python3.7/asyncio/futures.py", line 263, in __await__
    yield self  # This tells Task to wait for completion.
  File "/root/miniconda3/envs/aiida3710/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
    future.result()
  File "/root/miniconda3/envs/aiida3710/lib/python3.7/asyncio/futures.py", line 181, in result
    raise self._exception
  File "/root/miniconda3/envs/aiida3710/lib/python3.7/site-packages/kiwipy/rmq/threadcomms.py", line 253, in done
    result = kiwi_future.result()
  File "/root/miniconda3/envs/aiida3710/lib/python3.7/concurrent/futures/_base.py", line 428, in result
    return self.__get_result()
  File "/root/miniconda3/envs/aiida3710/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/root/miniconda3/envs/aiida3710/lib/python3.7/site-packages/kiwipy/futures.py", line 54, in capture_exceptions
    yield
  File "/root/miniconda3/envs/aiida3710/lib/python3.7/site-packages/plumpy/communications.py", line 48, in on_done
    result = plum_future.result()
  File "/root/miniconda3/envs/aiida3710/lib/python3.7/asyncio/futures.py", line 181, in result
    raise self._exception
  File "/root/miniconda3/envs/aiida3710/lib/python3.7/site-packages/kiwipy/futures.py", line 54, in capture_exceptions
    yield
  File "/root/miniconda3/envs/aiida3710/lib/python3.7/site-packages/plumpy/futures.py", line 73, in run_task
    res = await coro()
  File "/root/miniconda3/envs/aiida3710/lib/python3.7/site-packages/plumpy/process_comms.py", line 539, in __call__
    return await self._continue(communicator, **task.get(TASK_ARGS, {}))
  File "/root/aiida-core/aiida/manage/external/rmq.py", line 204, in _continue
    return future.result()
  File "/root/miniconda3/envs/aiida3710/lib/python3.7/concurrent/futures/_base.py", line 428, in result
    return self.__get_result()
  File "/root/miniconda3/envs/aiida3710/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
aiida.engine.exceptions.PastException: concurrent.futures._base.CancelledError

I found the following in the python 3.7.10 changelog

bpo-33110: Handle exceptions raised by functions added by concurrent.futures add_done_callback correctly when the Future has already completed.

but I tried python 3.7.10 and it does not fix the issue.

@chrisjsewell
Copy link
Member

stop the clock, I have a diagnosis + fix...

@chrisjsewell
Copy link
Member

chrisjsewell commented Mar 4, 2021

ok so (deep breath)
first doing the replacement in tasks.py of plumpy.futures.CancelledError -> plumpy.futures.CancelledError, asyncio.CancelledError, then adding some additional debug logging, then running:

verdi run submit.py
verdi daemon start; sleep 2; verdi daemon stop

In python 3.8 daemon log:

03/04/2021 04:48:28 PM <95126> aiida.engine.daemon.runner: [INFO] Starting a daemon runner
03/04/2021 04:48:28 PM <95126> aiida.orm.nodes.process.calculation.calcjob.CalcJobNode: [INFO] Loaded process<145> from saved state
03/04/2021 04:48:28 PM <95126> aiida.engine.persistence: [DEBUG] Persisting process<145>
03/04/2021 04:48:28 PM <95126> aiida.orm.nodes.process.calculation.calcjob.CalcJobNode: [INFO] Process<145>: Broadcasting state change: state_changed.created.running
03/04/2021 04:48:28 PM <95126> aiida.engine.persistence: [DEBUG] Persisting process<145>
03/04/2021 04:48:28 PM <95126> aiida.orm.nodes.process.calculation.calcjob.CalcJobNode: [INFO] Process<145>: Broadcasting state change: state_changed.running.waiting
03/04/2021 04:48:28 PM <95126> aiida.engine.processes.calcjobs.tasks: [INFO] scheduled request to upload CalcJob<145>
03/04/2021 04:48:28 PM <95126> aiida.orm.nodes.process.calculation.calcjob.CalcJobNode: [DEBUG] Process<145>: received broadcast message 'state_changed.created.running' with communicator '<kiwipy.rmq.communicator.RmqSubscriber object at 0x7f8242c389a0>': None
03/04/2021 04:48:28 PM <95126> aiida.orm.nodes.process.calculation.calcjob.CalcJobNode: [DEBUG] Process<145>: received broadcast message 'state_changed.running.waiting' with communicator '<kiwipy.rmq.communicator.RmqSubscriber object at 0x7f8242c389a0>': None
03/04/2021 04:48:31 PM <95126> aiida.engine.daemon.runner: [INFO] Received signal to shut down the daemon runner
03/04/2021 04:48:31 PM <95126> aiida.engine.processes.calcjobs.tasks: [DEBUG] interrupting upload: <class 'asyncio.exceptions.CancelledError'>::
03/04/2021 04:48:31 PM <95126> aiida.engine.processes.calcjobs.tasks: [DEBUG] waiting retrieved interruption: <class 'asyncio.exceptions.CancelledError'>:
03/04/2021 04:48:31 PM <95126> aiida.engine.daemon.runner: [INFO] Daemon runner stopped
03/04/2021 04:48:31 PM <95126> aiida.engine.daemon.runner: [INFO] Daemon runner started

In python 3.7 daemon log:

03/04/2021 04:56:21 PM <99604> aiida.engine.daemon.runner: [INFO] Starting a daemon runner
03/04/2021 04:56:21 PM <99604> aiida.orm.nodes.process.calculation.calcjob.CalcJobNode: [INFO] Loaded process<160> from saved state
03/04/2021 04:56:21 PM <99604> aiida.engine.persistence: [DEBUG] Persisting process<160>
03/04/2021 04:56:21 PM <99604> aiida.orm.nodes.process.calculation.calcjob.CalcJobNode: [INFO] Process<160>: Broadcasting state change: state_changed.created.running
03/04/2021 04:56:21 PM <99604> aiida.engine.persistence: [DEBUG] Persisting process<160>
03/04/2021 04:56:21 PM <99604> aiida.orm.nodes.process.calculation.calcjob.CalcJobNode: [INFO] Process<160>: Broadcasting state change: state_changed.running.waiting
03/04/2021 04:56:21 PM <99604> aiida.engine.processes.calcjobs.tasks: [INFO] scheduled request to upload CalcJob<160>
03/04/2021 04:56:21 PM <99604> aiida.orm.nodes.process.calculation.calcjob.CalcJobNode: [DEBUG] Process<160>: received broadcast message 'state_changed.created.running' with communicator '<kiwipy.rmq.communicator.RmqSubscriber object at 0x7f4cb86fa390>': None
03/04/2021 04:56:21 PM <99604> aiida.orm.nodes.process.calculation.calcjob.CalcJobNode: [DEBUG] Process<160>: received broadcast message 'state_changed.running.waiting' with communicator '<kiwipy.rmq.communicator.RmqSubscriber object at 0x7f4cb86fa390>': None
03/04/2021 04:56:24 PM <99604> aiida.engine.daemon.runner: [INFO] Received signal to shut down the daemon runner
03/04/2021 04:56:24 PM <99604> aiida.engine.processes.calcjobs.tasks: [DEBUG] waiting retrieved interruption: <class 'concurrent.futures._base.CancelledError'>:
03/04/2021 04:56:24 PM <99604> aiida.orm.nodes.process.calculation.calcjob.CalcJobNode: [REPORT] [160|ArithmeticAddCalculation|on_except]: Traceback (most recent call last):
  File "/root/miniconda3/envs/aiida/lib/python3.7/site-packages/plumpy/processes.py", line 1182, in step
    next_state = await self._run_task(self._state.execute)
  File "/root/miniconda3/envs/aiida/lib/python3.7/site-packages/plumpy/processes.py", line 548, in _run_task
    result = await coro(*args, **kwargs)
  File "/root/aiida-core/aiida/engine/processes/calcjobs/tasks.py", line 372, in execute
    skip_submit = await self._launch_task(task_upload_job, self.process, transport_queue)
  File "/root/aiida-core/aiida/engine/processes/calcjobs/tasks.py", line 431, in _launch_task
    result = await self._task
  File "/root/miniconda3/envs/aiida/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
    future.result()
concurrent.futures._base.CancelledError

03/04/2021 04:56:24 PM <99604> aiida.orm.nodes.process.calculation.calcjob.CalcJobNode: [INFO] Process<160>: Broadcasting state change: state_changed.waiting.excepted
03/04/2021 04:56:24 PM <99604> aiida.engine.transports: [ERROR] Exception whilst using transport <class 'concurrent.futures._base.CancelledError'>:
Traceback (most recent call last):
  File "/root/aiida-core/aiida/engine/transports.py", line 110, in request_transport
    yield transport_request.future
  File "/root/aiida-core/aiida/engine/processes/calcjobs/tasks.py", line 79, in do_upload
    transport = await cancellable.with_interrupt(request)
  File "/root/aiida-core/aiida/engine/utils.py", line 95, in with_interrupt
    result = await next(wait_iter)
  File "/root/miniconda3/envs/aiida/lib/python3.7/asyncio/tasks.py", line 556, in _wait_for_one
    f = await done.get()
  File "/root/miniconda3/envs/aiida/lib/python3.7/asyncio/queues.py", line 159, in get
    await getter
  File "/root/miniconda3/envs/aiida/lib/python3.7/asyncio/futures.py", line 263, in __await__
    yield self  # This tells Task to wait for completion.
  File "/root/miniconda3/envs/aiida/lib/python3.7/asyncio/tasks.py", line 318, in __wakeup
    future.result()
  File "/root/miniconda3/envs/aiida/lib/python3.7/asyncio/futures.py", line 176, in result
    raise CancelledError
concurrent.futures._base.CancelledError

03/04/2021 04:56:24 PM <99604> aiida.orm.nodes.process.calculation.calcjob.CalcJobNode: [DEBUG] exponential_backoff_retry encountered exception: <class 'concurrent.futures._base.CancelledError'>::
03/04/2021 04:56:24 PM <99604> aiida.engine.processes.calcjobs.tasks: [DEBUG] interrupting upload: <class 'concurrent.futures._base.CancelledError'>::
03/04/2021 04:56:24 PM <99604> aiida.orm.nodes.process.calculation.calcjob.CalcJobNode: [DEBUG] Process<160>: received broadcast message 'state_changed.waiting.excepted' with communicator '<kiwipy.rmq.communicator.RmqSubscriber object at 0x7f4cb86fa390>': None
03/04/2021 04:56:24 PM <99604> aiida.engine.daemon.runner: [INFO] Daemon runner stopped
03/04/2021 04:56:24 PM <99604> aiida.engine.daemon.runner: [INFO] Daemon runner started

From this (and the fix) I'm 90% sure that Task.cancel excepts with concurrent.futures.CancelledError in 3.7 and asyncio.CancelledError in 3.8+, which appears entirely undocumented!
Edit: before 3.8 asyncio.CancelledError == concurrent.futures.CancelledError

We cancel all the tasks before closing the daemon here:

for task in tasks:
task.cancel()

For asyncio.CancelledError in 3.8+, this bypasses all our exception catching (since it inherits from BaseException and not Exception) which, through luck or judgement, works in general, except its not caught here:

except (plumpy.process_states.Interruption, plumpy.futures.CancelledError):
node.set_process_status(f'Transport task {command} was interrupted')
raise

and so the process status is not changed from "Waiting for transport task: upload"

For concurrent.futures.CancelledError in 3.7, I've identified 3 critical places where you need to catch and raise this exception for the process to not be set to Excepted, i.e. adding this before

            except asyncio.CancelledError:
                raise

https://github.com/aiidateam/plumpy/blob/db0bf6033aa3a9b69e0e5b30206df05135538fd7/plumpy/processes.py#L1196

except Exception:

except Exception as exception:

@ltalirz
Copy link
Member

ltalirz commented Mar 4, 2021

From this (and the fix) I'm 90% sure that Task.cancel excepts with concurrent.futures.CancelledError in 3.7 and asyncio.CancelledError in 3.8+, which appears entirely undocumented!

Well, as mentioned in the python 3.8 changelog that you linked, in python3.7 asyncio.CancelledError used to inherit from concurrent.futures.CancelledError, see

python/cpython@431b540#diff-5db3920ed2c2a4091ac1f9272983fbfd41df66f0cee777f8715b354102f64e36

I.e. the asyncio.CancelledError is thrown in both versions, but only in python 3.7 it is also a concurrent.futures.CancelledError (which inherits from Error which inherits from Exception).

As for the exception handling, the interesting question is now: which of the except blocks should be triggered by a asyncio.CancelledError (also in python 3.8) and which shouldn't?
I have the feeling a PR is in the making? ;-)

@chrisjsewell
Copy link
Member

chrisjsewell commented Mar 4, 2021

I.e. the asyncio.CancelledError is thrown in both versions, but only in python 3.7 it is also a concurrent.futures.CancelledError

thats not correct though, the asyncio.CancelledError is not thrown in python 3.7, thats clear from the daemon log, it is specifically throwing concurrent.futures.CancelledError

As for the exception handling, the interesting question is now: which of the except blocks should be triggered by a asyncio.CancelledError (also in python 3.8) and which shouldn't?
I have the feeling a PR is in the making? ;-)

Oh yes thats what I'm doing now, #4648 (comment) details exactly what needs to be changed

@ltalirz
Copy link
Member

ltalirz commented Mar 4, 2021

thats not correct though, the asyncio.CancelledError is not thrown in python 3.7, thats clear from the daemon log, it is specifically throwing concurrent.futures.CancelledError

Well, slightly before the change linked above you have this:
python/cpython@0baa72f#diff-09dfaf886af00c47cf9afe044bff6e7ccdade7c085b0f0b7684258b86d2a0cf5

I.e. at that point, asyncio.CancelledError was concurrent.futures.CancelledError

@chrisjsewell
Copy link
Member

chrisjsewell commented Mar 4, 2021

Ah fair, so:

The exception asyncio.CancelledError now inherits from BaseException rather than Exception and no longer inherits from concurrent.futures.CancelledError.

is pretty misleading then, because it does not inherit from concurrent.futures.CancelledError, it is concurrent.futures.CancelledError

@chrisjsewell
Copy link
Member

I have the feeling a PR is in the making? ;-)

but yeh I will be making PRs shortly 👍

@chrisjsewell
Copy link
Member

chrisjsewell commented Mar 4, 2021

As for the exception handling, the interesting question is now: which of the except blocks should be triggered by a asyncio.CancelledError (also in python 3.8) and which shouldn't?

well in aiida-core mostly none of them I guess, because we just want them to "pass straight through". The only reason in aiida-core to catch the concurrent.futures.CancelledError is to filter them from the Exception catch, which is no longer necessary with asyncio.CancelledError inheriting only from BaseException.
Its just the catch to set the process status, before re-raising.

In kiwipy though concurrent.futures.CancelledError is caught and processed in specific ways (without re-reaising), so this may well also need to catch asyncio.CancelledError (although this does not seem directly related to this issue)

@chrisjsewell
Copy link
Member

chrisjsewell commented Mar 4, 2021

In kiwipy though concurrent.futures.CancelledError is caught and processed in specific ways (without re-reaising), so this may well also need to catch asyncio.CancelledError (although this does not seem directly related to this issue)

cc @muhrin for comment

@chrisjsewell
Copy link
Member

First part of the fix: aiidateam/plumpy#214

@chrisjsewell
Copy link
Member

then the second part is #4792 (@ltalirz feel free to give a "pre-review")

@muhrin
Copy link
Contributor

muhrin commented Mar 8, 2021

In kiwipy though concurrent.futures.CancelledError is caught and processed in specific ways (without re-reaising), so this may well also need to catch asyncio.CancelledError (although this does not seem directly related to this issue)

Thanks Chris. I'd have to a look a bit more deeply but kiwipy.CancelledErrors (i.e. concurrent cancelled exceptions) are part of the kiwipy API in the sense that a user is allowed to raise them to, for example, cancel their processing of a task. They are not designed to be interchangable with asyncio CancelledErrors which should just be treated as a standard exception (rather than a direct action by the user). At least this is how the API is now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants