Skip to content

Commit

Permalink
Merge pull request #165 from pycompression/release_1.5.0
Browse files Browse the repository at this point in the history
Release 1.5.0
  • Loading branch information
rhpvorderman authored Oct 12, 2023
2 parents c689da0 + 74883cf commit 95517bc
Show file tree
Hide file tree
Showing 7 changed files with 363 additions and 55 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ Changelog
.. This document is user facing. Please word the changes in such a way
.. that users understand how the changes affect the new version.
version 1.5.0
-----------------
+ Make a special case for threads==1 in ``igzip_threaded.open`` for writing
files. This now combines the writing and compression thread for less
overhead.
+ Maximize time spent outside the GIL for ``igzip_threaded.open`` writing.
This has decreased wallclock time significantly.

version 1.4.1
-----------------
+ Fix several errors related to unclosed files and buffers.
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def build_isa_l():

setup(
name="isal",
version="1.4.1",
version="1.5.0",
description="Faster zlib and gzip compatible compression and "
"decompression by providing python bindings for the ISA-L "
"library.",
Expand Down
2 changes: 1 addition & 1 deletion src/isal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@
"__version__"
]

__version__ = "1.4.1"
__version__ = "1.5.0"
144 changes: 106 additions & 38 deletions src/isal/igzip_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@


def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
encoding=None, errors=None, newline=None, *, threads=1):
encoding=None, errors=None, newline=None, *, threads=1,
block_size=1024 * 1024):
"""
Utilize threads to read and write gzip objects and escape the GIL.
Comparable to gzip.open. This method is only usable for streamed reading
Expand All @@ -39,6 +40,8 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
:param threads: If 0 will defer to igzip.open, if < 0 will use all threads
available to the system. Reading gzip can only
use one thread.
:param block_size: Determines how large the blocks in the read/write
queues are for threaded reading and writing.
:return: An io.BufferedReader, io.BufferedWriter, or io.TextIOWrapper,
depending on the mode.
"""
Expand All @@ -61,21 +64,27 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
else:
raise TypeError("filename must be a str or bytes object, or a file")
if "r" in mode:
gzip_file = io.BufferedReader(_ThreadedGzipReader(binary_file))
gzip_file = io.BufferedReader(
_ThreadedGzipReader(binary_file, block_size=block_size))
else:
gzip_file = io.BufferedWriter(
_ThreadedGzipWriter(binary_file, compresslevel, threads),
buffer_size=1024 * 1024
_ThreadedGzipWriter(
fp=binary_file,
block_size=block_size,
level=compresslevel,
threads=threads
),
buffer_size=block_size
)
if "t" in mode:
return io.TextIOWrapper(gzip_file, encoding, errors, newline)
return gzip_file


class _ThreadedGzipReader(io.RawIOBase):
def __init__(self, fp, queue_size=4, block_size=8 * 1024 * 1024):
def __init__(self, fp, queue_size=2, block_size=1024 * 1024):
self.raw = fp
self.fileobj = igzip._IGzipReader(fp, buffersize=8 * 1024 * 1024)
self.fileobj = igzip._IGzipReader(fp, buffersize=8 * block_size)
self.pos = 0
self.read_file = False
self.queue = queue.Queue(queue_size)
Expand Down Expand Up @@ -179,35 +188,53 @@ class _ThreadedGzipWriter(io.RawIOBase):
The writer thread reads from output queues and uses the crc32_combine
function to calculate the total crc. It also writes the compressed block.
When only one thread is requested, only the input queue is used and
compressing and output is handled in one thread.
"""
def __init__(self,
fp: BinaryIO,
level: int = isal_zlib.ISAL_DEFAULT_COMPRESSION,
threads: int = 1,
queue_size: int = 2):
if level < 0 or level > 3:
raise ValueError(
f"Invalid compression level, "
f"level should be between 0 and 3: {level}")
queue_size: int = 1,
block_size: int = 1024 * 1024,
):
self.lock = threading.Lock()
self.exception: Optional[Exception] = None
self.raw = fp
self.level = level
self.previous_block = b""
self.input_queues: List[queue.Queue[Tuple[bytes, memoryview]]] = [
queue.Queue(queue_size) for _ in range(threads)]
self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [
queue.Queue(queue_size) for _ in range(threads)]
self.index = 0
# Deflating random data results in an output a little larger than the
# input. Making the output buffer 10% larger is sufficient overkill.
compress_buffer_size = block_size + max(block_size // 10, 500)
self.block_size = block_size
self.compressors: List[isal_zlib._ParallelCompress] = [
isal_zlib._ParallelCompress(buffersize=compress_buffer_size,
level=level) for _ in range(threads)
]
if threads > 1:
self.input_queues: List[queue.Queue[Tuple[bytes, memoryview]]] = [
queue.Queue(queue_size) for _ in range(threads)]
self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [
queue.Queue(queue_size) for _ in range(threads)]
self.output_worker = threading.Thread(target=self._write)
self.compression_workers = [
threading.Thread(target=self._compress, args=(i,))
for i in range(threads)
]
elif threads == 1:
self.input_queues = [queue.Queue(queue_size)]
self.output_queues = []
self.compression_workers = []
self.output_worker = threading.Thread(
target=self._compress_and_write)
else:
raise ValueError(f"threads should be at least 1, got {threads}")
self.threads = threads
self.index = 0
self._crc = 0
self.running = False
self._size = 0
self.output_worker = threading.Thread(target=self._write)
self.compression_workers = [
threading.Thread(target=self._compress, args=(i,))
for i in range(threads)
]
self._closed = False
self._write_gzip_header()
self.start()
Expand Down Expand Up @@ -246,8 +273,19 @@ def write(self, b) -> int:
with self.lock:
if self.exception:
raise self.exception
index = self.index
length = b.nbytes if isinstance(b, memoryview) else len(b)
if length > self.block_size:
# write smaller chunks and return the result
memview = memoryview(b)
start = 0
total_written = 0
while start < length:
total_written += self.write(
memview[start:start+self.block_size])
start += self.block_size
return total_written
data = bytes(b)
index = self.index
zdict = memoryview(self.previous_block)[-DEFLATE_WINDOW_SIZE:]
self.previous_block = data
self.index += 1
Expand Down Expand Up @@ -289,6 +327,7 @@ def closed(self) -> bool:
def _compress(self, index: int):
in_queue = self.input_queues[index]
out_queue = self.output_queues[index]
compressor: isal_zlib._ParallelCompress = self.compressors[index]
while True:
try:
data, zdict = in_queue.get(timeout=0.05)
Expand All @@ -297,23 +336,11 @@ def _compress(self, index: int):
return
continue
try:
compressor = isal_zlib.compressobj(
self.level, wbits=-15, zdict=zdict)
compressed = compressor.compress(data) + compressor.flush(
isal_zlib.Z_SYNC_FLUSH)
crc = isal_zlib.crc32(data)
compressed, crc = compressor.compress_and_crc(data, zdict)
except Exception as e:
with self.lock:
self.exception = e
# Abort everything and empty the queue
in_queue.task_done()
self.running = False
while True:
try:
_ = in_queue.get(timeout=0.05)
in_queue.task_done()
except queue.Empty:
return
in_queue.task_done()
self._set_error_and_empty_queue(e, in_queue)
return
data_length = len(data)
out_queue.put((compressed, crc, data_length))
in_queue.task_done()
Expand Down Expand Up @@ -341,5 +368,46 @@ def _write(self):
output_queue.task_done()
index += 1

def _compress_and_write(self):
if not self.threads == 1:
raise SystemError("Compress_and_write is for one thread only")
fp = self.raw
total_crc = 0
size = 0
in_queue = self.input_queues[0]
compressor = self.compressors[0]
while True:
try:
data, zdict = in_queue.get(timeout=0.05)
except queue.Empty:
if not self.running:
self._crc = total_crc
self._size = size
return
continue
try:
compressed, crc = compressor.compress_and_crc(data, zdict)
except Exception as e:
in_queue.task_done()
self._set_error_and_empty_queue(e, in_queue)
return
data_length = len(data)
total_crc = isal_zlib.crc32_combine(total_crc, crc, data_length)
size += data_length
fp.write(compressed)
in_queue.task_done()

def _set_error_and_empty_queue(self, error, q):
with self.lock:
self.exception = error
# Abort everything and empty the queue
self.running = False
while True:
try:
_ = q.get(timeout=0.05)
q.task_done()
except queue.Empty:
return

def writable(self) -> bool:
return True
4 changes: 4 additions & 0 deletions src/isal/isal_zlib.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ def adler32(__data, __value: int = 1) -> int: ...
def crc32(__data, __value: int = 0) -> int: ...
def crc32_combine(__crc1: int, __crc2: int, __crc2_length: int) -> int: ...

class _ParallelCompress:
def __init__(self, buffersize: int, level: int): ...
def compress_and_crc(self, __data, __zdict) -> typing.Tuple[bytes, int]: ...

def compress(__data,
level: int = ISAL_DEFAULT_COMPRESSION,
wbits: int = MAX_WBITS) -> bytes: ...
Expand Down
Loading

0 comments on commit 95517bc

Please sign in to comment.