Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faust Agent & Process hangs when using topic.take() #262

Closed
2 tasks done
nemosupremo opened this issue Jan 3, 2019 · 4 comments · Fixed by #265
Closed
2 tasks done

Faust Agent & Process hangs when using topic.take() #262

nemosupremo opened this issue Jan 3, 2019 · 4 comments · Fixed by #265

Comments

@nemosupremo
Copy link
Contributor

nemosupremo commented Jan 3, 2019

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

https://gist.github.com/nemosupremo/2f7a12119c5106c6624cd8d7467e8e77

Run this worker, which should crash and never recover. An Interrupt (ctrl+c) will cause an indefinite hang. Note that with topic.items() the crash doesn't occur, it only occurs with topic.take(x, within=y)

Expected behavior

The agent restarts and/or crashes the application.

Actual behavior

The Application hangs indefinitely.

Full traceback

n/a

Versions

  • Python version 3.6.7
  • Faust version 1.4.2 & master
  • Operating system OS X 10.13.6 & Ubuntu 16.04
  • Kafka version 1.1.1
  • RocksDB version (if applicable) n/a
@nemosupremo
Copy link
Contributor Author

nemosupremo commented Jan 3, 2019

Here is the full stack trace from pyrasite:

Thread 0x7fd9cffff700
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 884, in _bootstrap
    self._bootstrap_inner()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/concurrent/futures/thread.py", line 67, in _worker
    work_item = work_queue.get(block=True)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/queue.py", line 164, in get
    self.not_empty.wait()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 295, in wait
    waiter.acquire()

Thread 0x7fd9e4b92700
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 884, in _bootstrap
    self._bootstrap_inner()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/concurrent/futures/thread.py", line 67, in _worker
    work_item = work_queue.get(block=True)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/queue.py", line 164, in get
    self.not_empty.wait()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 295, in wait
    waiter.acquire()

Thread 0x7fd9e53d3700
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 884, in _bootstrap
    self._bootstrap_inner()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/concurrent/futures/thread.py", line 67, in _worker
    work_item = work_queue.get(block=True)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/queue.py", line 164, in get
    self.not_empty.wait()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 295, in wait
    waiter.acquire()

Thread 0x7fd9e5c14700
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 884, in _bootstrap
    self._bootstrap_inner()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/concurrent/futures/thread.py", line 67, in _worker
    work_item = work_queue.get(block=True)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/queue.py", line 164, in get
    self.not_empty.wait()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 295, in wait
    waiter.acquire()

Thread 0x7fd9e6495700
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 884, in _bootstrap
    self._bootstrap_inner()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/concurrent/futures/thread.py", line 67, in _worker
    work_item = work_queue.get(block=True)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/queue.py", line 164, in get
    self.not_empty.wait()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 295, in wait
    waiter.acquire()

Thread 0x7fd9e734a700
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 884, in _bootstrap
    self._bootstrap_inner()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/concurrent/futures/thread.py", line 67, in _worker
    work_item = work_queue.get(block=True)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/queue.py", line 164, in get
    self.not_empty.wait()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 295, in wait
    waiter.acquire()

Thread 0x7fd9e7b8b700
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 884, in _bootstrap
    self._bootstrap_inner()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/concurrent/futures/thread.py", line 67, in _worker
    work_item = work_queue.get(block=True)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/queue.py", line 164, in get
    self.not_empty.wait()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/threading.py", line 295, in wait
    waiter.acquire()

Thread 0x7fd9f0f0f700
  File "fpipeline.py", line 60, in <module>
    main()
  File "fpipeline.py", line 57, in main
    app.main()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/site-packages/faust/app/base.py", line 581, in main
    cli(app=self)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/site-packages/click/core.py", line 722, in __call__
    return self.main(*args, **kwargs)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/site-packages/click/core.py", line 697, in main
    rv = self.invoke(ctx)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/site-packages/click/core.py", line 895, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/site-packages/click/core.py", line 535, in invoke
    return callback(*args, **kwargs)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/site-packages/click/decorators.py", line 17, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/site-packages/faust/cli/base.py", line 463, in _inner
    return cmd()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/site-packages/faust/cli/base.py", line 538, in __call__
    self.run_using_worker(*args, **kwargs)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/site-packages/faust/cli/base.py", line 547, in run_using_worker
    return worker.execute_from_commandline()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/site-packages/mode/worker.py", line 191, in execute_from_commandline
    self.loop.run_until_complete(self.start())
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/asyncio/base_events.py", line 460, in run_until_complete
    self.run_forever()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/asyncio/base_events.py", line 427, in run_forever
    self._run_once()
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/asyncio/base_events.py", line 1404, in _run_once
    event_list = self._selector.select(timeout)
  File "/home/ubuntu/.pyenv/versions/3.6.7/lib/python3.6/selectors.py", line 445, in select
    fd_event_list = self._epoll.poll(timeout, max_ev)
  File "<string>", line 1, in <module>
  File "<string>", line 5, in <module>

Posted it before looking it over; doesn't look useful.

@nemosupremo
Copy link
Contributor Author

It seems that wait_empty gets stuck in an infinite loop

(DEVLOG=true)

[2019-01-07 09:09:21,823: INFO]: [^--Consumer]: STILL WAITING FOR ALL STREAMS TO FINISH
[2019-01-07 09:09:21,823: INFO]: [^--Consumer]: WAITING FOR 1 EVENTS
[2019-01-07 09:09:22,847: INFO]: [^--Consumer]: STILL WAITING FOR ALL STREAMS TO FINISH
[2019-01-07 09:09:22,847: INFO]: [^--Consumer]: WAITING FOR 1 EVENTS
[2019-01-07 09:09:23,873: INFO]: [^--Consumer]: STILL WAITING FOR ALL STREAMS TO FINISH
[2019-01-07 09:09:23,874: INFO]: [^--Consumer]: WAITING FOR 1 EVENTS
[2019-01-07 09:09:24,900: INFO]: [^--Consumer]: STILL WAITING FOR ALL STREAMS TO FINISH
[2019-01-07 09:09:24,901: INFO]: [^--Consumer]: WAITING FOR 1 EVENTS
[2019-01-07 09:09:25,928: INFO]: [^--Consumer]: STILL WAITING FOR ALL STREAMS TO FINISH
[2019-01-07 09:09:25,929: INFO]: [^--Consumer]: WAITING FOR 1 EVENTS

@nemosupremo
Copy link
Contributor Author

nemosupremo commented Jan 7, 2019

I think this is could be a result of trying to try/finally in a (async) generator. PEP 255 Simple Generators states:

As noted earlier, yield is not allowed in the try clause of a try/finally construct. A consequence is that generators should allocate critical resources with great care. There is no restriction on yield otherwise appearing in finally clauses, except clauses, or in the try clause of a try/except construct:

I'm not sure if this applies to async generators

faust/faust/streams.py

Lines 338 to 341 in 5d8152d

try:
yield list(buffer)
finally:
buffer.clear()

In any case, the finally statement here does not execute causing the generator to deadlock because the dead message is never acked.

I don't understand why this doesn't happen on normal iteration however in __aiter__.

Possible related to python-trio/trio#265?

@nemosupremo
Copy link
Contributor Author

nemosupremo commented Jan 8, 2019

Ignore my last comment

import asyncio
import uvloop

async def sample():
	for i in range(10):
		try:
			yield i
		finally:
			print(f"finished iter of {i}")

async def main():
	try:
		async for val in sample():
			print(val)
			if val > 5:
				x = {"a": "b"}
				y = x["f"]
	except Exception:
		print("exception")
		raise
	finally:
		print("cleaning up main")

if __name__ == "__main__":
	asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
	loop = asyncio.get_event_loop()
	# Blocking call which returns when the hello_world() coroutine is done
	loop.run_until_complete(main())
	loop.close()

works completely fine. I'm not sure why the finalizer doesn't seem to execute in this case. Also adding a await asyncio.sleep(5) before crasherfunc, I now see that the finalizer is running, it seems whatever is hooking stdout was eating the print messages.

nemosupremo added a commit to nemosupremo/faust that referenced this issue Jan 8, 2019
nemosupremo added a commit to nemosupremo/faust that referenced this issue Jan 8, 2019
nemosupremo added a commit to nemosupremo/faust that referenced this issue Jan 8, 2019
@ask ask closed this as completed in #265 Jan 14, 2019
ask added a commit that referenced this issue Jan 14, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant