Skip to content

Commit 30221b0

Browse files
committed
Fixed #253: added a timeout for threads to join
1 parent 31c9ce3 commit 30221b0

File tree

1 file changed

+12
-2
lines changed

1 file changed

+12
-2
lines changed

src/sotoki/utils/executor.py

+12-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# -*- coding: utf-8 -*-
33
# vim: ai ts=4 sts=4 et sw=4 nu
44

5+
import datetime
56
import queue
67
import threading
78
from typing import Callable
@@ -12,6 +13,7 @@
1213
# Lock that ensures that new workers are not created while the interpreter is
1314
# shutting down. Must be held while mutating _threads_queues and _shutdown.
1415
_global_shutdown_lock = threading.Lock()
16+
thread_deadline_sec = 60
1517

1618

1719
def excepthook(args):
@@ -129,11 +131,19 @@ def join(self):
129131
"""Await completion of workers, requesting them to stop taking new task"""
130132
logger.debug(f"joining all threads for {self.prefix}")
131133
self.no_more = True
132-
for t in self._workers:
134+
for num, t in enumerate(self._workers):
135+
deadline = datetime.datetime.now() + datetime.timedelta(
136+
seconds=thread_deadline_sec
137+
)
138+
logger.debug(f"Giving {self.prefix}{num} {thread_deadline_sec}s to join")
133139
e = threading.Event()
134-
while t.is_alive():
140+
while t.is_alive() and datetime.datetime.now() < deadline:
135141
t.join(1)
136142
e.wait(timeout=2)
143+
if t.is_alive():
144+
logger.debug(f"Thread {self.prefix}{num} is not joining. Skipping…")
145+
else:
146+
logger.debug(f"Thread {self.prefix}{num} joined")
137147
logger.debug(f"all threads joined for {self.prefix}")
138148

139149
def release_halt(self):

0 commit comments

Comments
 (0)