-
Notifications
You must be signed in to change notification settings - Fork 192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
"maximum recursion depth reached" when submitting ~200 workchains #4876
Comments
Does this come from the submission script or the daemon? Looks to be on the daemon side, correct? |
Sorry, forgot to mention: it comes from the submission script, not from the daemon logs |
It may also be helpful to provide the output of |
Are you sure? Is he running the workchains in that script or sending them to the daemon? Reason that I am asking is that the stack trace shows that the problem originates from the |
@sphuber The error comes from inspecting the report ( @chrisjsewell is this enough?
Thank you for the help! |
Thanks for the additional info @danieleongari . It would be very useful if we actually could get the version of
So as expected, the problem is on the daemon worker side and not the submit script. What is happening is that the daemon worker receives a task from the process queue from RabbitMQ, it loads the corresponding node from the database and then uses the YAML dump in the
from the The only hint there is is that the final method in the stacktrace comes from the |
sorry for the hickkup with the submission script vs daemon - there was a miscommunication on our side |
I'm having a somewhat similar issue, so I'll add my case to this one. When submitting a 100 $ verdi process list -S running
PK Created Process label Process State Process status
----- --------- --------------- --------------- ----------------
36774 1h ago PwBaseWorkChain ⏵ Running
36783 1h ago PwBaseWorkChain ⏵ Running
...
37166 1h ago PwBaseWorkChain ⏵ Running
Total results: 49 (Note that I've already deleted and restarted one manually). The logs for these are completely empty: $ verdi process report 37156
No log messages recorded for this entry But a bit of digging through the daemon logs gives me the following trace: 05/03/2021 08:52:17 PM <16823> kiwipy.rmq.tasks: [ERROR] Exception occurred while processing task.
Traceback (most recent call last):
File "/home/mbercx/.virtualenvs/aiida-sirius/lib/python3.8/site-packages/plumpy/utils.py", line 128, in __getattr__
return self[attr]
File "/home/mbercx/.virtualenvs/aiida-sirius/lib/python3.8/site-packages/plumpy/utils.py", line 85, in __getitem__
return self._dict[key]
KeyError: 'kpoints'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/mbercx/envs/aiida-sirius/code/aiida-quantumespresso/aiida_quantumespresso/workflows/pw/base.py", line 247, in validate_kpoints
kpoints = self.inputs.kpoints
File "/home/mbercx/.virtualenvs/aiida-sirius/lib/python3.8/site-packages/plumpy/utils.py", line 131, in __getattr__
raise AttributeError(errmsg)
AttributeError: 'AttributesFrozendict' object has no attribute 'kpoints' These two are basically repeated until a Traceback (most recent call last):
File "/home/mbercx/.virtualenvs/aiida-sirius/lib/python3.8/site-packages/kiwipy/rmq/tasks.py", line 166, in _on_task
result = await result
File "/usr/lib/python3.8/asyncio/futures.py", line 257, in __await__
yield self # This tells Task to wait for completion.
File "/usr/lib/python3.8/asyncio/tasks.py", line 349, in __wakeup
future.result()
File "/usr/lib/python3.8/asyncio/futures.py", line 175, in result
raise self._exception
File "/home/mbercx/.virtualenvs/aiida-sirius/lib/python3.8/site-packages/kiwipy/rmq/threadcomms.py", line 253, in done
result = kiwi_future.result()
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 432, in result
return self.__get_result()
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 388, in __get_result
raise self._exception
File "/home/mbercx/.virtualenvs/aiida-sirius/lib/python3.8/site-packages/kiwipy/futures.py", line 54, in capture_exceptions
yield
File "/home/mbercx/.virtualenvs/aiida-sirius/lib/python3.8/site-packages/plumpy/communications.py", line 48, in on_done
result = plum_future.result()
File "/usr/lib/python3.8/asyncio/futures.py", line 175, in result
raise self._exception
File "/home/mbercx/.virtualenvs/aiida-sirius/lib/python3.8/site-packages/kiwipy/futures.py", line 54, in capture_exceptions
yield
File "/home/mbercx/.virtualenvs/aiida-sirius/lib/python3.8/site-packages/plumpy/futures.py", line 73, in run_task
res = await coro()
File "/home/mbercx/.virtualenvs/aiida-sirius/lib/python3.8/site-packages/plumpy/process_comms.py", line 539, in __call__
return await self._continue(communicator, **task.get(TASK_ARGS, {}))
File "/home/mbercx/envs/aiida-sirius/code/aiida-core/aiida/manage/external/rmq.py", line 219, in _continue
self.handle_continue_exception(node, exception, message)
File "/home/mbercx/envs/aiida-sirius/code/aiida-core/aiida/manage/external/rmq.py", line 158, in handle_continue_exception
node.logger.exception(message)
File "/usr/lib/python3.8/logging/__init__.py", line 1814, in exception
self.log(ERROR, msg, *args, exc_info=exc_info, **kwargs)
File "/usr/lib/python3.8/logging/__init__.py", line 1829, in log
self.logger.log(level, msg, *args, **kwargs)
File "/usr/lib/python3.8/logging/__init__.py", line 1500, in log
self._log(level, msg, args, **kwargs)
File "/usr/lib/python3.8/logging/__init__.py", line 1577, in _log
self.handle(record)
File "/usr/lib/python3.8/logging/__init__.py", line 1587, in handle
self.callHandlers(record)
File "/usr/lib/python3.8/logging/__init__.py", line 1649, in callHandlers
hdlr.handle(record)
File "/usr/lib/python3.8/logging/__init__.py", line 950, in handle
self.emit(record)
File "/usr/lib/python3.8/logging/__init__.py", line 1081, in emit
msg = self.format(record)
File "/usr/lib/python3.8/logging/__init__.py", line 925, in format
return fmt.format(record)
File "/usr/lib/python3.8/logging/__init__.py", line 672, in format
record.exc_text = self.formatException(record.exc_info)
File "/usr/lib/python3.8/logging/__init__.py", line 622, in formatException
traceback.print_exception(ei[0], ei[1], tb, None, sio)
File "/usr/lib/python3.8/traceback.py", line 103, in print_exception
for line in TracebackException(
File "/usr/lib/python3.8/traceback.py", line 493, in __init__
context = TracebackException(
File "/usr/lib/python3.8/traceback.py", line 493, in __init__
context = TracebackException(
File "/usr/lib/python3.8/traceback.py", line 493, in __init__
context = TracebackException(
[Previous line repeated 34 more times]
File "/usr/lib/python3.8/traceback.py", line 476, in __init__
_seen.add(id(exc_value))
RecursionError: maximum recursion depth exceeded while calling a Python object After deleting one and restarting it, it ran just fine. I've also started ~35 work chains like this just fine. I then tried deleting the remaining 49, and restarting them all at once, and then I wind up with 7 RecursionError: maximum recursion depth exceeded while calling a Python object
05/03/2021 10:28:13 PM <17394> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [37947|PwBaseWorkChain|run_process]: launching PwCalculation<37961> iteration #1
05/03/2021 10:28:14 PM <17394> aiida.orm.nodes.process.workflow.workchain.WorkChainNode: [REPORT] [37849|PwBaseWorkChain|run_process]: launching PwCalculation<37964> iteration #1
... Afterwards, I deleted all 100 work chains, and then restarted them with a 5 second pause in between. Now all work chains were able to create the I'm running: OS: Ubuntu 18.04.5 LTS |
Seeing the same thing, also when submitting a large amount of workchains (whether one-by-one in a script or by a parent workchain launching them does not matter, but when launching them via a parent workchain the issue is triggered sooner). The backtrace is a bit different, though:
|
Just want to say that I did a similar thing like submitting the work chains (~100 VaspRelaxWorkChain) in a loop without delay previously (aiida-core < 1.6.1), and found no issue. In my case, the submission process itself is rather slow though, takes about 1.5 seconds for each call to |
And another one, again at a different place (but this one comes from the sub-workchain called by the primary one):
|
I took the liberty to add the |
@danieleongari is the computer to run the calculations configured with an |
Just to report the same issue, submitting many common workflows. I have two different behaviours (probably because of the different place where the recursion limit occurs):
The |
@chrisjsewell can you please look into this? This seems like a blocker and important issue... |
Indeed 👍 I’m going to do a bunch of aiida-core stuff next week |
Not sure. Looking at the original example in the OP happens when a |
BTW:
Since I don't think there is any sensitive information, I attach the full log (I started running from a clean profile yesterday, so the log should contain all info, not cluttered with too much else): daemon.log.zip |
When performing FLEUR scandium relaxation I ran into this error which Vasily described yesterday in the mailing list. Here you find a full traceback:
This and similar tracebacks occur on various points of the work chain. |
Thanks, I think this error is maybe different to the one opened for this issue. aiida-core/aiida/orm/nodes/node.py Lines 455 to 456 in 4174e5d
something like: try:
count = builder.count()
except Exception as exc:
raise ValueError(f'the link ({source} -> {self}) would result in an erroneous query') from exc
if count > 0:
raise ValueError(f'the link you are attempting to create ({source} -> {self}) would generate a cycle in the graph') at least then we could see what nodes are trying to be linked |
I got a chance to reproduce this issue and I add the exception trace as @chrisjsewell suggested. Here is the traces I got. It is basically lots of kpoints KeyError followed with
|
Oh exactly, that is what I can't understand 😅 |
I think I may have a lead. Thinking about why it seems the asyncio tasks seem to be sharing stacks even though they shouldn't, I remember that RuntimeError: This event loop is already running In the examples of myself, @mbercx and @unkcpz which are all running Think this is a promising avenue to look in further, although I am not quite sure exactly how to debug the frames that the process function execution adds. Ideally, if we find out this is the problem, we should really find a way to refactor process functions such that they no longer need |
I tried running the |
I am very curious to see how you make this happen. 🤔 |
Note, I can't reproduce any issue when adding import nest_asyncio
nest_asyncio.apply()
import asyncio
import sys
sys.setrecursionlimit(70)
number_of_tasks = 10
recursion_depth = 100
async def recursive_task(task_name, counter):
if counter <= 0:
return
print(f"Task {task_name} - Counter: {counter}")
# Simulating some asynchronous work
await asyncio.sleep(.1)
# Recursive call
await recursive_task(task_name, counter - 1)
async def main():
tasks = [
asyncio.create_task(recursive_task(f"Task {i}", recursion_depth))
for i in range(number_of_tasks)
]
# Wait for all tasks to complete
await asyncio.gather(*tasks)
asyncio.run(main()) This is certainly not to say that |
@sphuber can you provide the stack trace for this, to see the code path that leads up to "re-entry" |
@chrisjsewell but that is because you are not actually using the functionality of Take the following example that simulates what AiiDA is doing: #!/usr/bin/env python
import nest_asyncio
nest_asyncio.apply()
import asyncio
import sys
sys.setrecursionlimit(70)
number_of_tasks = 2
recursion_depth = 10
async def recursive_task(task_name, counter):
if counter <= 0:
return
print(f"Task {task_name} - Counter: {counter}")
asyncio.get_event_loop().run_until_complete(recursive_task(task_name, counter - 1))
async def main():
tasks = [
asyncio.create_task(recursive_task(f"Task {i}", recursion_depth))
for i in range(number_of_tasks)
]
# Wait for all tasks to complete
await asyncio.gather(*tasks)
asyncio.run(main()) This will except with the recursion error almost instantly. |
What you do is remove the application of result = process.execute() with runner.loop.create_task(process.step_until_terminated()) and line 188 return result, process.node with return process.node.get_outgoing().all_nodes()[0], process.node You should now be able to run a from aiida import engine, orm, plugins
structure = orm.StructureData(ase=bulk('Si', 'diamond', a=5.43))
builder = plugins.WorkflowFactory('quantumespresso.pw.base').get_builder_from_protocol(code, structure, protocol='fast')
results, node = engine.run_get_node(builder) If you only disable the As I said, this is a real hack and is not a real solution, but at least I think it demonstrates that nest-asyncio is the problem, because with this hack I can run as many workchains over the daemon without hitting the recursion error. |
I don't think there is really a "bug" in the stack handling with In summary, I only see the following possible solutions:
The last one seems counter to the concept of asyncio though, and I am not sure if it is even possible. Essentially what we want there is that the calcfunction should be completed synchronously. |
Here is another nasty workaround that we could consider putting in place in case we cannot solve this problem properly. We could think to increase the recursion limit dynamically whenever a process function is about to be executed and the current frame number comes close to the current limit. Of course, if this were to go without end, in the end we would hit a stack overflow. We could add an additional hard limit above we won't go, and maybe at that point, instead of executing the process function, we could relinquish control to the event loop so it can try and do other stuff, ideally finishing other process functions that had already been started and in so freeing up frames on the stack. |
Hi all, first of all huge thanks to all of you! Now, since I wasn't deep into this, I tried to give to it an external look from a different point of view to understand the problem, and while reading the docs of Reading a bit further I was lucky to find a (very simple!) solution by the developer of SQLAlchemy for a workaround to have a coroutine call a normal function that in turns would like to await a coroutine. It's very simple (<30 lines) and requires minimal change to the code, see also below or the linked GitHub gist in the comment of the code below. I report below a modified version of the code you guys wrote above, that crashes with a few (not too many, only 20) not-too-deep recursive calls:
I then rewrote it using the suggested trick and it works fine, even if I run 20000 different tasks! Here is the code of the failing one using #!/usr/bin/env python
import nest_asyncio
nest_asyncio.apply()
import asyncio
import sys
sys.setrecursionlimit(70)
# Running already 20 not-too-deep recursions crashes badly
# Note: we are calling a recursive co-routine that calls a routine that internally calls again the initial coroutine
number_of_tasks = 20
recursion_depth = 5
async def recursive_task(task_name, counter):
if counter <= 0:
return
print(f"Task {task_name} - Counter: {counter}")
intermediate_function(task_name, counter)
def intermediate_function(task_name, counter):
asyncio.get_event_loop().run_until_complete(recursive_task(task_name, counter - 1))
async def main():
tasks = [
asyncio.create_task(recursive_task(f"Task {i}", recursion_depth))
for i in range(number_of_tasks)
]
# Wait for all tasks to complete
await asyncio.gather(*tasks)
asyncio.run(main()) And here is the code that works - I put 200 tasks because there is printing that is slow, but if you comment out printing you'll see it works even with 20000 and more): #!/usr/bin/env python
import asyncio
import sys
########################################################################
### START SOLUTION FROM zzzeek ###
# From https://gist.github.com/zzzeek/4e89ce6226826e7a8df13e1b573ad354
import greenlet
def await_(coroutine):
current = greenlet.getcurrent()
if not isinstance(current, AsyncIoGreenlet):
raise Exception(
"not running inside a greenlet right now, "
"can't use await_() function"
)
return current.driver.switch(coroutine)
class AsyncIoGreenlet(greenlet.greenlet):
def __init__(self, driver, fn):
greenlet.greenlet.__init__(self, fn, driver)
self.driver = driver
async def greenlet_spawn(__fn, *args, **kw):
target = AsyncIoGreenlet(greenlet.getcurrent(), __fn)
target_return = target.switch(*args, **kw)
while target:
try:
result = await target_return
except:
target_return = target.throw(*sys.exc_info())
else:
target_return = target.switch(result)
# clean up cycle for the common case
# (gc can do the exception case)
del target.driver
return target_return
### END SOLUTION FROM zzzeek ###
########################################################################
sys.setrecursionlimit(70)
# Here I set a lot of concurrent tasks. This never creates problems.
number_of_tasks = 200
recursion_depth = 5
async def recursive_task(task_name, counter):
if counter <= 0:
return
#print(f"Task {task_name} - Counter: {counter}")
await greenlet_spawn(intermediate_function, task_name, counter)
def intermediate_function(task_name, counter):
await_(recursive_task(task_name, counter - 1))
async def main():
tasks = [
asyncio.create_task(recursive_task(f"Task {i}", recursion_depth))
for i in range(number_of_tasks)
]
# Wait for all tasks to complete
await asyncio.gather(*tasks)
asyncio.run(main()) I'd be curious to know if this can solve our issue with minimal changes to the AiiDA code, or if this is not something we can use to fix this issue. |
Interesting @giovannipizzi thanks for that. I will give this a go soon with the test I was running and see if it works. One concern I have is that reading the docs of greenlet, it seems they are working with threads, and normally that is a no-go for AiiDA, as it is currently written. The SqlA engine is not setup correctly for that in |
From a brief read to https://greenlet.readthedocs.io/en/latest/ they just compare to threads to explain the difference, but they say that they are lightweight coroutines without the requirement of python language support, can also work with C code, and do cooperative scheduling, so I don't think we have thread issues |
Thanks @giovannipizzi, although yeh I would certainly be cautious here; although it looks like only a few lines of code, you are literally adding a whole new concurrency model (one that is not "native" to Python), which could obviously come with its own issues and can be difficult to debug.
Indeed we should be very clear on what we are trying to achieve, in order to understand the tradeoffs in solutions |
I think this is the problem: sometimes this is what we are doing, but not always. We can sum the situation up as follows:
This leads to the following scenarios: (1) sync -> async In (1) the outer call context is synchronous, the user simply calls the process function as a normal synchronous Python function. However, for scenario (2), for example a workchain that is run by a daemon runner, is called asynchronously. This is also where the need for This leads to the problem of trying to implement the Essentially what happens currently when a process function is called is:
To me it is not clear where the call to |
@chrisjsewell you are totally right - here I'm just trying to see if this might work. If it does we should discuss if we should do it. However, the current design of the engine requires features not supported by native asyncio, so in a way or another we need to do something different (currently, nested-asyncio), so I feel that the solution might work, especially (to double check) if anyway some of our libraries are using it (e.g. SQLAlchemy). |
@sphuber I indeed had thought to this after posting my comment. Each recursive task has a random small (~0.01s) slowdown to allow potential concurrent execution, to prove that things are not just running serially. Not sure if this really reproduces what we do in AiiDA... but it seems to work? (Again, good to have a double check that I'm not doing something very stupid) #!/usr/bin/env python
import asyncio
import sys
import json
########################################################################
### START SOLUTION FROM zzzeek ###
# From https://gist.github.com/zzzeek/4e89ce6226826e7a8df13e1b573ad354
import greenlet
def await_(coroutine):
current = greenlet.getcurrent()
if not isinstance(current, AsyncIoGreenlet):
raise Exception(
"not running inside a greenlet right now, "
"can't use await_() function"
)
return current.driver.switch(coroutine)
class AsyncIoGreenlet(greenlet.greenlet):
def __init__(self, driver, fn):
greenlet.greenlet.__init__(self, fn, driver)
self.driver = driver
async def greenlet_spawn(__fn, *args, **kw):
target = AsyncIoGreenlet(greenlet.getcurrent(), __fn)
target_return = target.switch(*args, **kw)
while target:
try:
result = await target_return
except:
target_return = target.throw(*sys.exc_info())
else:
target_return = target.switch(result)
# clean up cycle for the common case
# (gc can do the exception case)
del target.driver
return target_return
### END SOLUTION FROM zzzeek ###
########################################################################
def append_task(name, val):
try:
with open('tasks.json') as f:
tasks = json.load(f)
except IOError:
tasks = []
tasks.append([name, val])
with open('tasks.json', 'w') as f:
json.dump(tasks, f)
def pop_task():
try:
with open('tasks.json') as f:
tasks = json.load(f)
except IOError:
return None
try:
name, val = tasks.pop(0) # FIFO
except IndexError:
return None
with open('tasks.json', 'w') as f:
json.dump(tasks, f)
return name, val
def await_or_new_loop(coroutine):
current = greenlet.getcurrent()
if not isinstance(current, AsyncIoGreenlet):
print("creating main loop")
return asyncio.get_event_loop().run_until_complete(coroutine)
else:
print("reentring with greenlets")
return current.driver.switch(coroutine)
sys.setrecursionlimit(70)
# Here I set a lot of concurrent tasks. This never creates problems.
number_of_tasks = 3
recursion_depth = 4
async def coro_executor(f, args, kwargs):
# give a chance to switch context
await asyncio.sleep(0)
await greenlet_spawn(f, *args, **kwargs)
def calcf(func):
def inner(*args, **kwargs):
await_or_new_loop(coro_executor(func, args, kwargs))
return inner
@calcf
def recursive(task_name, counter):
import time
import random
# Very small delay to give a chance to other steps to happen at the same time
time.sleep(random.random() * 0.01) # Random time between 0 and 0.1 s
print(f"Task {task_name} - Counter: {counter}")
if counter <= 0:
return
return recursive(task_name, counter - 1)
async def daemon_run():
daemon_loop_counter = 0
sleep_counter = 0
futures = []
while True:
daemon_loop_counter += 1
name_val = pop_task()
if name_val is None:
sleep_counter += 1
if sleep_counter < 5:
print("No tasks, daemon waiting")
if sleep_counter == 1:
# At some point, "randomly" (here after ~1 sec), new tasks arrive
for i in range(number_of_tasks):
append_task(f"New task {i}", recursion_depth)
await asyncio.sleep(1) # No tasks, wait 1 sec
continue
else:
print("Stopping deamon after 5 sec, I now await all")
await asyncio.gather(*futures)
break
# Run task
name, val = name_val
print(f"Deamon task to run: {name} {val}")
futures.append(asyncio.create_task(coro_executor(recursive, [name, val], {})))
print("Running in main code, no coroutines")
recursive('test task', 5)
print("Running again")
recursive('test task', 6)
# "Submit" tasks (DB/RMQ replaced with JSON file, enough for here, not good for
# multiprocessing with many daemons)
for i in range(number_of_tasks):
append_task(f"Task {i}", recursion_depth)
print()
print("Running in parallel from a 'daemon'")
asyncio.run(daemon_run()) Output:
|
OK - this does not work in Jupyter, but then we need to continue using anyway at the top: import nest_asyncio
nest_asyncio.apply() but just for this use case, and this should happen only once at the very top call of a calcfunction in a script (i.e. not in a daemon) - for how I wrote the code above, getting the outer loop should happen only once. If you just add the two lines, of nest-asyncio the script above will fail with a RecusionError, but no worries! This is simply because nest-asyncio adds quite a few calls. If you replace with a more reasonable limit: sys.setrecursionlimit(500) then you can also run with number_of_tasks = 600
recursion_depth = 4 (so even more tasks than the recursion limit), and things work, and from the output it looks to me that things are happening in parallel. |
Some additional reporting, to trying to convince that the greenbelt approach is better than the current nest_asyncio+increase_of_stack_size workaround (which of course was fine to get a quick improvement - but I still advocate that we should do some additional testing and move to the greenbelt approach. Point 1: supporting both approaches We can implement it in a way that the greenlet approach is optional, as I show below. In this way, we can put it in optionally in the code and ask people to activate if they get stack overflow errors, if we are not really sure (or, I would say, I'd prefer the opposite, we turn it on by default and can revert to the current behaviour with some configuration option). #!/usr/bin/env python
import asyncio
import sys
import itertools
import logging
#logging.basicConfig(level=logging.INFO)
import nest_asyncio
nest_asyncio.apply()
# Here I set a lot of concurrent tasks. This never creates problems.
START_RECURSION_LIMIT=2000
number_of_tasks = 40000
recursion_depth = 20
INCREASE_RECURSION_LIMIT = False
USE_GREENLET = True
sys.setrecursionlimit(START_RECURSION_LIMIT)
if USE_GREENLET:
import greenlet
class AsyncIoGreenlet(greenlet.greenlet):
def __init__(self, driver, fn):
greenlet.greenlet.__init__(self, fn, driver)
self.driver = driver
def await_or_new_loop(coroutine):
if USE_GREENLET:
current = greenlet.getcurrent()
if not isinstance(current, AsyncIoGreenlet):
logging.info("creating main loop")
return asyncio.get_event_loop().run_until_complete(coroutine)
else:
logging.info("reentring with greenlets")
return current.driver.switch(coroutine)
else:
return asyncio.get_event_loop().run_until_complete(coroutine)
async def greenlet_spawn(__fn, *args, **kw):
target = AsyncIoGreenlet(greenlet.getcurrent(), __fn)
target_return = target.switch(*args, **kw)
while target:
try:
result = await target_return
except:
target_return = target.throw(*sys.exc_info())
else:
target_return = target.switch(result)
# clean up cycle for the common case
# (gc can do the exception case)
del target.driver
return target_return
### END SOLUTION FROM zzzeek ###
########################################################################
async def spawn_as_coroutine(f, *args, **kwargs):
if USE_GREENLET:
await greenlet_spawn(f, *args, **kwargs)
else:
f(*args, **kwargs)
def get_stack_size(size: int = 2) -> int: # type: ignore[return]
frame = sys._getframe(size) # pylint: disable=protected-access
try:
for size in itertools.count(size, 8): # pylint: disable=redefined-argument-from-local
frame = frame.f_back.f_back.f_back.f_back.f_back.f_back.f_back.f_back # type: ignore[assignment,union-attr]
except AttributeError:
while frame:
frame = frame.f_back # type: ignore[assignment]
size += 1
return size - 1
async def recursive_task(task_name, counter):
if counter <= 0:
return
logging.info(f"Task {task_name} - Counter: {counter}")
await spawn_as_coroutine(intermediate_function, task_name, counter)
def intermediate_function(task_name, counter):
if INCREASE_RECURSION_LIMIT:
frame_delta = 1000
frame_count = get_stack_size()
stack_limit = sys.getrecursionlimit()
# If the current frame count is more than 80% of the stack limit, or comes within 200 frames, increase the
# stack limit by ``frame_delta``.
if frame_count > min(0.8 * stack_limit, stack_limit - 200):
logging.info(f"Old recursion limit = {stack_limit}, new = {stack_limit + frame_delta}")
sys.setrecursionlimit(stack_limit + frame_delta)
await_or_new_loop(recursive_task(task_name, counter - 1))
async def main():
tasks = [
asyncio.create_task(recursive_task(f"Task {i}", recursion_depth))
for i in range(number_of_tasks)
]
# Wait for all tasks to complete
await asyncio.gather(*tasks)
logging.info("Running in main code, no coroutines")
intermediate_function('test task 1', 5)
logging.info("Running again")
intermediate_function('test task 2', 6)
logging.info("")
logging.info("Running in parallel from a 'daemon'")
import time
t = time.monotonic()
asyncio.run(main())
print(f"{number_of_tasks=}; {recursion_depth=}; {START_RECURSION_LIMIT=}; {INCREASE_RECURSION_LIMIT=}; {USE_GREENLET=}; Elapsed time: {time.monotonic() - t} s") Point 2: clarity in stack in case of an exception In the original discussion by zzzeek, they clearly mention that the exceptions properly bubble up. Also, I'd say that the current approach with nest_asyncio is worse as we noted in this specific issue while debugging Point 3: performance As the test results of the code above show (that I report below),
The greenlet approach works well even with 40000 tasks:
Point 4: robustness The solution with greenlet is being used by robust libraries like SQLAlchemy so I think they are quite robust (in the end we are just using the greenlet library that is designed to do this). In addition, even with an automatic increase of the recursion limit, if we run 400 tasks in parallel (with a stack depth of only 10 each, quite reasonable: Note: if we activate greenlets, we need to disable (at least the fast) version of the automatic increase of the recursion limit, as accessing
So, in summary, I think we should give the greenlet approach a go. Opinions? |
If we can get rid of I will give this implementation a test run with an actual workload to look at performance and robustness. Note that if it works and we can get rid of I propose then that we go ahead with the v2.4 release, which is ready with the workaround that works sufficiently well for the time being, and then we work towards the better solution. Note that anyways I have modern releases of |
@giovannipizzi I tried running with I then tried running the greenlet solution with
I noticed that when launching the workflows, I see a lot of I am not quite sure what the logic is but it might just be how tasks get scheduled on the loop. It seems that when I comment out the |
Thanks @sphuber - do you have the changed code somewhere we can look at? Maybe the best is to find some time to look at this together (maybe even in person, it might be faster). Here it seems that Anyway we can indeed release the workaround of increasing the stack size, and schedule already some time later to discuss this. |
BTW, I think my test code was done quickly and I didn't bother returning the results of awaited coroutines - so it was working with printing but not with getting function return values. Not sure if this is the problem, but definitely it might be if you just used my example above. Here is an updated code that also properly passes around return values (I think, from quick testing...) #!/usr/bin/env python
import asyncio
import sys
import json
########################################################################
### START SOLUTION FROM zzzeek ###
# From https://gist.github.com/zzzeek/4e89ce6226826e7a8df13e1b573ad354
import greenlet
def await_(coroutine):
current = greenlet.getcurrent()
if not isinstance(current, AsyncIoGreenlet):
raise Exception(
"not running inside a greenlet right now, "
"can't use await_() function"
)
return current.driver.switch(coroutine)
class AsyncIoGreenlet(greenlet.greenlet):
def __init__(self, driver, fn):
greenlet.greenlet.__init__(self, fn, driver)
self.driver = driver
async def greenlet_spawn(__fn, *args, **kw):
target = AsyncIoGreenlet(greenlet.getcurrent(), __fn)
target_return = target.switch(*args, **kw)
while target:
try:
result = await target_return
except:
target_return = target.throw(*sys.exc_info())
else:
target_return = target.switch(result)
# clean up cycle for the common case
# (gc can do the exception case)
del target.driver
return target_return
### END SOLUTION FROM zzzeek ###
########################################################################
def append_task(name, val):
print(f">>> Appending new task '{name}' with val {val}")
try:
with open('tasks.json') as f:
tasks = json.load(f)
except IOError:
tasks = []
tasks.append([name, val])
with open('tasks.json', 'w') as f:
json.dump(tasks, f)
def pop_task():
try:
with open('tasks.json') as f:
tasks = json.load(f)
except IOError:
return None
try:
name, val = tasks.pop(0) # FIFO
except IndexError:
return None
with open('tasks.json', 'w') as f:
json.dump(tasks, f)
return name, val
def await_or_new_loop(coroutine):
current = greenlet.getcurrent()
if not isinstance(current, AsyncIoGreenlet):
print("creating main loop")
return asyncio.get_event_loop().run_until_complete(coroutine)
else:
print("reentring with greenlets")
return current.driver.switch(coroutine)
sys.setrecursionlimit(70)
# Here I set a lot of concurrent tasks. This never creates problems.
number_of_tasks = 3
recursion_depth = 4
async def coro_executor(f, args, kwargs):
# give a chance to switch context
await asyncio.sleep(0)
return await greenlet_spawn(f, *args, **kwargs)
def calcf(func):
def inner(*args, **kwargs):
return await_or_new_loop(coro_executor(func, args, kwargs))
return inner
@calcf
def recursive(task_name, counter):
import time
import random
# Very small delay to give a chance to other steps to happen at the same time
time.sleep(random.random() * 0.01) # Random time between 0 and 0.1 s
print(f"Task {task_name} - Counter: {counter}")
if counter <= 0:
return 0
return recursive(task_name, counter - 1) + 2
async def daemon_run():
daemon_loop_counter = 0
sleep_counter = 0
futures = []
in_vals = []
while True:
daemon_loop_counter += 1
name_val = pop_task()
if name_val is None:
sleep_counter += 1
if sleep_counter < 5:
print("No tasks, daemon waiting")
if sleep_counter == 1:
# At some point, "randomly" (here after ~1 sec), new tasks arrive
for i in range(number_of_tasks):
append_task(f"New task {i}", recursion_depth + 2 * i)
await asyncio.sleep(1) # No tasks, wait 1 sec
continue
else:
print("Stopping deamon after 5 sec, I now await all")
# We are waiting for all of them at the very end;
# in a real daemon this must be done better.
results = await asyncio.gather(*futures)
for (name, in_val), result in zip(in_vals, results):
print(f'-> [{name}] {in_val=}, {result=} (expected: {2*in_val})')
break
# Run task
name, val = name_val
print(f"<<< Deamon task to run: {name} {val}")
in_vals.append([name, val])
futures.append(asyncio.create_task(coro_executor(recursive, [name, val], {})))
print("Running in main code, no coroutines")
for in_val in [5, 6]:
out_val = recursive('test task', in_val)
print(f" -> {in_val=}, {out_val=} (expected: {2 * in_val})")
# "Submit" tasks (DB/RMQ replaced with JSON file, enough for here, not good for
# multiprocessing with many daemons)
for i in range(number_of_tasks):
append_task(f"Task {i}", recursion_depth + i)
print()
print("Running in parallel from a 'daemon'")
asyncio.run(daemon_run()) |
I did not see the issue anymore after #6052 for quite an intensive load on daemon last two months, I think it is safe to close this for the moment. |
Here is a diff of an attempt to use greenlets integrated into commit c905ef63b9e285049d3dc553e0a8595d8437aea1
Author: Sebastiaan Huber <mail@sphuber.net>
Date: Wed Jun 14 08:31:59 2023 -0700
Fix using greenlets
diff --git a/aiida/engine/processes/functions.py b/aiida/engine/processes/functions.py
index cb6fb52b1..4123fc723 100644
--- a/aiida/engine/processes/functions.py
+++ b/aiida/engine/processes/functions.py
@@ -15,6 +15,7 @@ import functools
import inspect
import logging
import signal
+import sys
import types
import typing as t
from typing import TYPE_CHECKING
@@ -61,6 +62,65 @@ LOGGER = logging.getLogger(__name__)
FunctionType = t.TypeVar('FunctionType', bound=t.Callable[..., t.Any])
+import greenlet
+
+
+def await_(coroutine):
+ current = greenlet.getcurrent()
+
+ if not isinstance(current, AsyncIoGreenlet):
+ raise Exception('not running inside a greenlet right now, '
+ "can't use await_() function")
+
+ return current.driver.switch(coroutine)
+
+
+class AsyncIoGreenlet(greenlet.greenlet):
+
+ def __init__(self, driver, fn):
+ greenlet.greenlet.__init__(self, fn, driver)
+ self.driver = driver
+
+
+async def greenlet_spawn(__fn, *args, **kw):
+ target = AsyncIoGreenlet(greenlet.getcurrent(), __fn)
+
+ target_return = target.switch(*args, **kw)
+
+ while target:
+ try:
+ result = await target_return
+ except:
+ target_return = target.throw(*sys.exc_info())
+ else:
+ target_return = target.switch(result)
+
+ # clean up cycle for the common case
+ # (gc can do the exception case)
+ del target.driver
+ return target_return
+
+
+import asyncio
+
+
+def await_or_new_loop(coroutine):
+ current = greenlet.getcurrent()
+
+ if not isinstance(current, AsyncIoGreenlet):
+ print('creating main loop')
+ return asyncio.get_event_loop().run_until_complete(coroutine)
+ else:
+ print('reentring with greenlets')
+ return current.driver.switch(coroutine)
+
+
+async def coro_executor(f, *args, **kwargs):
+ # give a chance to switch context
+ await asyncio.sleep(0)
+ result = await greenlet_spawn(f, *args, **kwargs)
+ return result
+
def calcfunction(function: FunctionType) -> FunctionType:
"""
@@ -174,7 +234,7 @@ def process_function(node_class: t.Type['ProcessNode']) -> t.Callable[[FunctionT
signal.signal(kill_signal, kill_process)
try:
- result = process.execute()
+ result = await_or_new_loop(coro_executor(process.execute))
finally:
# If the `original_handler` is set, that means the `kill_process` was bound, which needs to be reset
if original_handler:
|
@danieleongari reports the following error when submitting ~200 workchains with aiida-core 1.6.1
the error goes away when adding a
time.sleep(2)
in between submissionsYour environment
The text was updated successfully, but these errors were encountered: