Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Executor not executing tasks if there are no ready entities in the wait set #272

Merged
merged 2 commits into from
Feb 11, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def create_task(self, callback, *args, **kwargs):
task = Task(callback, args, kwargs, executor=self)
with self._tasks_lock:
self._tasks.append((task, None, None))
_rclpy.rclpy_trigger_guard_condition(self._guard_condition)
# Task inherits from Future
return task

Expand Down Expand Up @@ -363,20 +364,22 @@ def _wait_for_ready_callbacks(self, timeout_sec=None, nodes=None):
if nodes is None:
nodes = self.get_nodes()

# Yield tasks in-progress before waiting for new work
tasks = None
with self._tasks_lock:
tasks = list(self._tasks)
if tasks:
for task, entity, node in reversed(tasks):
if not task.executing() and not task.done() and (node is None or node in nodes):
yield task, entity, node
with self._tasks_lock:
# Get rid of any tasks that are done
self._tasks = list(filter(lambda t_e_n: not t_e_n[0].done(), self._tasks))

yielded_work = False
while not yielded_work and not self._is_shutdown:
# Yield tasks in-progress before waiting for new work
tasks = None
with self._tasks_lock:
tasks = list(self._tasks)
if tasks:
for task, entity, node in reversed(tasks):
if (not task.executing() and not task.done() and
(node is None or node in nodes)):
yielded_work = True
yield task, entity, node
with self._tasks_lock:
# Get rid of any tasks that are done
self._tasks = list(filter(lambda t_e_n: not t_e_n[0].done(), self._tasks))

# Gather entities that can be waited on
subscriptions = []
guards = []
Expand Down
35 changes: 35 additions & 0 deletions rclpy/test/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import asyncio
import threading
import time
import unittest

Expand Down Expand Up @@ -221,6 +222,40 @@ async def coro2():
self.assertTrue(future2.done())
self.assertEqual('Sentinel Result 2', future2.result())

def test_create_task_during_spin(self):
self.assertIsNotNone(self.node.handle)
executor = SingleThreadedExecutor(context=self.context)
executor.add_node(self.node)

future = None

def spin_until_task_done(executor):
nonlocal future
while future is None or future.done():
jacobperron marked this conversation as resolved.
Show resolved Hide resolved
try:
executor.spin_once()
finally:
executor.shutdown()
break

# Start spinning in a separate thread
thr = threading.Thread(target=spin_until_task_done, args=(executor, ), daemon=True)
thr.start()

def func():
return 'Sentinel Result'

# Create a task
future = executor.create_task(func)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there might be a race where this test could have a false negative if spin_until_task_done has not reached the while loop in _wait_for_ready_callbacks prior to this task being created.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think you're right. I'm not sure how to avoid it, but it can be mitigated by sleeping for a short time before creating the task.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See 1ee031a


thr.join(timeout=0.5)
# If the join timed out, remove the node to cause the spin thread to stop
if thr.is_alive():
executor.remove_node(self.node)

self.assertTrue(future.done())
self.assertEqual('Sentinel Result', future.result())

def test_global_executor_completes_async_task(self):
self.assertIsNotNone(self.node.handle)

Expand Down