Skip to content

Commit

Permalink
Fix infinite while loops with a thread is alive check
Browse files Browse the repository at this point in the history
  • Loading branch information
rhpvorderman committed Sep 24, 2024
1 parent 9f46857 commit 4a25e32
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 5 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ Changelog
.. This document is user facing. Please word the changes in such a way
.. that users understand how the changes affect the new version.
version 0.5.1-dev
-----------------
+ Threaded reading and writing do no longer block exiting when an exception
occurs in the main thread.

version 0.5.0
-----------------
+ Wheels are now build for MacOS arm64 architectures.
Expand Down
12 changes: 7 additions & 5 deletions src/zlib_ng/gzip_ng_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024):
self.worker = threading.Thread(target=self._decompress)
self._closed = False
self.running = True
self._calling_thread = threading.current_thread()
self.worker.start()

def _check_closed(self, msg=None):
Expand All @@ -110,15 +111,15 @@ def _check_closed(self, msg=None):
def _decompress(self):
block_size = self.block_size
block_queue = self.queue
while self.running:
while self.running and self._calling_thread.is_alive():
try:
data = self.fileobj.read(block_size)
except Exception as e:
self.exception = e
return
if not data:
return
while self.running:
while self.running and self._calling_thread.is_alive():
try:
block_queue.put(data, timeout=0.05)
break
Expand Down Expand Up @@ -215,6 +216,7 @@ def __init__(self,
if "b" not in mode:
mode += "b"
self.lock = threading.Lock()
self._calling_thread = threading.current_thread()
self.exception: Optional[Exception] = None
self.level = level
self.previous_block = b""
Expand Down Expand Up @@ -349,7 +351,7 @@ def _compress(self, index: int):
in_queue = self.input_queues[index]
out_queue = self.output_queues[index]
compressor: zlib_ng._ParallelCompress = self.compressors[index]
while True:
while self._calling_thread.is_alive():
try:
data, zdict = in_queue.get(timeout=0.05)
except queue.Empty:
Expand All @@ -372,7 +374,7 @@ def _write(self):
fp = self.raw
total_crc = 0
size = 0
while True:
while self._calling_thread.is_alive():
out_index = index % self.threads
output_queue = output_queues[out_index]
try:
Expand All @@ -397,7 +399,7 @@ def _compress_and_write(self):
size = 0
in_queue = self.input_queues[0]
compressor = self.compressors[0]
while True:
while self._calling_thread.is_alive():
try:
data, zdict = in_queue.get(timeout=0.05)
except queue.Empty:
Expand Down

0 comments on commit 4a25e32

Please sign in to comment.