Skip to content

Commit

Permalink
Fix a race condition of checking vs adding results (#535)
Browse files Browse the repository at this point in the history
Specifically, adding results was queued in a job executor, while
checking results was directly called by the worker threads.
If the worker thread checks before the executor had added results,
it is possible to get into a deadlock condition. The deadlock
arises from the fact that the `stop` condition is never called
and the main thread will continue to wait for its END_Q signal.
  • Loading branch information
PGijsbers authored Jun 18, 2023
1 parent da57a67 commit 85c143e
Showing 1 changed file with 19 additions and 7 deletions.
26 changes: 19 additions & 7 deletions amlb/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import queue
import signal
import threading
from functools import partial
from typing import Callable, List, Optional

from .utils import Namespace, Timer, ThreadSafeCounter, InterruptTimeout, is_main_thread, raise_in_thread, signal_handler
Expand Down Expand Up @@ -354,17 +355,28 @@ def __init__(self, jobs: List,
self._queueing_strategy = queueing_strategy
self._interrupt = threading.Event()
self._exec = None
self.futures = []

def _add_result(self, result):
sup_call = super()._add_result
def _safe_call_from_exec(self, fn):
if self._exec:
self._exec.submit(sup_call, result)
future = self._exec.submit(fn)
self.futures.append(future)
else:
log.warning("Application is submitting a function while the thread executor is not running: executing the function in the calling thread.")
log.warning(
"Application is submitting a function while the thread executor is not running: executing the function in the calling thread.")
try:
sup_call(result)
except:
pass
fn()
except Exception as e:
log.exception(e)

def _add_result(self, result):
sup_call = partial(super()._add_result, result)
self._safe_call_from_exec(sup_call)

def stop_if_complete(self):
# Direct calls introduce a race condition by the queued '_add_result' calls
sup_call = super().stop_if_complete
self._safe_call_from_exec(sup_call)

def _run(self):
q = queue.Queue()
Expand Down

0 comments on commit 85c143e

Please sign in to comment.