Skip to content

Commit

Permalink
Merge pull request #58 from pycompression/fixdeadlock
Browse files Browse the repository at this point in the history
Fixdeadlock
  • Loading branch information
rhpvorderman authored Sep 24, 2024
2 parents 6b942fb + 4a25e32 commit b0728bb
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 35 deletions.
7 changes: 3 additions & 4 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ Changelog
.. This document is user facing. Please word the changes in such a way
.. that users understand how the changes affect the new version.
version 0.5.1
version 0.5.1-dev
-----------------
+ Fix a bug where ``gzip_ng_threaded.open`` could
cause a hang when the program exited and the program was not used with a
context manager.
+ Threaded reading and writing do no longer block exiting when an exception
occurs in the main thread.

version 0.5.0
-----------------
Expand Down
51 changes: 21 additions & 30 deletions src/zlib_ng/gzip_ng_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024):
self.block_size = block_size
self.worker = threading.Thread(target=self._decompress)
self._closed = False
self.running = False
self.running = True
self._calling_thread = threading.current_thread()
self.worker.start()

def _check_closed(self, msg=None):
if self._closed:
Expand All @@ -109,34 +111,23 @@ def _check_closed(self, msg=None):
def _decompress(self):
block_size = self.block_size
block_queue = self.queue
while self.running:
while self.running and self._calling_thread.is_alive():
try:
data = self.fileobj.read(block_size)
except Exception as e:
self.exception = e
return
if not data:
return
while self.running:
while self.running and self._calling_thread.is_alive():
try:
block_queue.put(data, timeout=0.05)
break
except queue.Full:
pass

def _start(self):
if not self.running:
self.running = True
self.worker.start()

def _stop(self):
if self.running:
self.running = False
self.worker.join()

def readinto(self, b):
self._check_closed()
self._start()
result = self.buffer.readinto(b)
if result == 0:
while True:
Expand Down Expand Up @@ -164,7 +155,8 @@ def tell(self) -> int:
def close(self) -> None:
if self._closed:
return
self._stop()
self.running = False
self.worker.join()
self.fileobj.close()
if self.closefd:
self.raw.close()
Expand Down Expand Up @@ -224,6 +216,7 @@ def __init__(self,
if "b" not in mode:
mode += "b"
self.lock = threading.Lock()
self._calling_thread = threading.current_thread()
self.exception: Optional[Exception] = None
self.level = level
self.previous_block = b""
Expand Down Expand Up @@ -261,6 +254,7 @@ def __init__(self,
self.raw, self.closefd = open_as_binary_stream(filename, mode)
self._closed = False
self._write_gzip_header()
self.start()

def _check_closed(self, msg=None):
if self._closed:
Expand All @@ -283,24 +277,21 @@ def _write_gzip_header(self):
self.raw.write(struct.pack(
"BBBBIBB", magic1, magic2, method, flags, mtime, os, xfl))

def _start(self):
if not self.running:
self.running = True
self.output_worker.start()
for worker in self.compression_workers:
worker.start()
def start(self):
self.running = True
self.output_worker.start()
for worker in self.compression_workers:
worker.start()

def stop(self):
"""Stop, but do not care for remaining work"""
if self.running:
self.running = False
for worker in self.compression_workers:
worker.join()
self.output_worker.join()
self.running = False
for worker in self.compression_workers:
worker.join()
self.output_worker.join()

def write(self, b) -> int:
self._check_closed()
self._start()
with self.lock:
if self.exception:
raise self.exception
Expand Down Expand Up @@ -360,7 +351,7 @@ def _compress(self, index: int):
in_queue = self.input_queues[index]
out_queue = self.output_queues[index]
compressor: zlib_ng._ParallelCompress = self.compressors[index]
while True:
while self._calling_thread.is_alive():
try:
data, zdict = in_queue.get(timeout=0.05)
except queue.Empty:
Expand All @@ -383,7 +374,7 @@ def _write(self):
fp = self.raw
total_crc = 0
size = 0
while True:
while self._calling_thread.is_alive():
out_index = index % self.threads
output_queue = output_queues[out_index]
try:
Expand All @@ -408,7 +399,7 @@ def _compress_and_write(self):
size = 0
in_queue = self.input_queues[0]
compressor = self.compressors[0]
while True:
while self._calling_thread.is_alive():
try:
data, zdict = in_queue.get(timeout=0.05)
except queue.Empty:
Expand Down
1 change: 0 additions & 1 deletion tests/test_gzip_ng_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ def test_threaded_write_error(threads):
threads=threads, block_size=8 * 1024)
# Bypass the write method which should not allow blocks larger than
# block_size.
f._start()
f.input_queues[0].put((os.urandom(1024 * 64), b""))
with pytest.raises(OverflowError) as error:
f.close()
Expand Down

0 comments on commit b0728bb

Please sign in to comment.