From 85c143ee2bd63ddcde9731ca311267c79aa65f96 Mon Sep 17 00:00:00 2001
From: Pieter Gijsbers
Date: Sun, 18 Jun 2023 12:43:24 +0300
Subject: [PATCH] Fix a race condition of checking vs adding results (#535)
Specifically, adding results was queued in a job executor, while
checking results was directly called by the worker threads.
If the worker thread checks before the executor had added results,
it is possible to get into a deadlock condition. The deadlock
arises from the fact that the `stop` condition is never called
and the main thread will continue to wait for its END_Q signal.
---
amlb/job.py | 26 +++++++++++++++++++-------
1 file changed, 19 insertions(+), 7 deletions(-)
diff --git a/amlb/job.py b/amlb/job.py
index c15789604..6701f033d 100644
--- a/amlb/job.py
+++ b/amlb/job.py
@@ -14,6 +14,7 @@
import queue
import signal
import threading
+from functools import partial
from typing import Callable, List, Optional
from .utils import Namespace, Timer, ThreadSafeCounter, InterruptTimeout, is_main_thread, raise_in_thread, signal_handler
@@ -354,17 +355,28 @@ def __init__(self, jobs: List,
self._queueing_strategy = queueing_strategy
self._interrupt = threading.Event()
self._exec = None
+ self.futures = []
- def _add_result(self, result):
- sup_call = super()._add_result
+ def _safe_call_from_exec(self, fn):
if self._exec:
- self._exec.submit(sup_call, result)
+ future = self._exec.submit(fn)
+ self.futures.append(future)
else:
- log.warning("Application is submitting a function while the thread executor is not running: executing the function in the calling thread.")
+ log.warning(
+ "Application is submitting a function while the thread executor is not running: executing the function in the calling thread.")
try:
- sup_call(result)
- except:
- pass
+ fn()
+ except Exception as e:
+ log.exception(e)
+
+ def _add_result(self, result):
+ sup_call = partial(super()._add_result, result)
+ self._safe_call_from_exec(sup_call)
+
+ def stop_if_complete(self):
+ # Direct calls introduce a race condition by the queued '_add_result' calls
+ sup_call = super().stop_if_complete
+ self._safe_call_from_exec(sup_call)
def _run(self):
q = queue.Queue()