From 8c5980dca18340d74042a794ad5942d4c0af6414 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Mon, 9 Oct 2023 07:51:51 +0200 Subject: [PATCH 01/24] Update version --- setup.py | 2 +- src/isal/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 39d4f481..7aebac23 100644 --- a/setup.py +++ b/setup.py @@ -135,7 +135,7 @@ def build_isa_l(): setup( name="isal", - version="1.4.1", + version="1.5.0-dev", description="Faster zlib and gzip compatible compression and " "decompression by providing python bindings for the ISA-L " "library.", diff --git a/src/isal/__init__.py b/src/isal/__init__.py index 2efb29d8..64ba0dbb 100644 --- a/src/isal/__init__.py +++ b/src/isal/__init__.py @@ -27,4 +27,4 @@ "__version__" ] -__version__ = "1.4.1" +__version__ = "1.5.0-dev" From 07584bed65398531ef677a110661cc1d0f5baa0d Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Mon, 9 Oct 2023 11:15:54 +0200 Subject: [PATCH 02/24] Create a separate function that calculates the decompressed block and the crc outside the GIL --- src/isal/igzip_threaded.py | 7 +- src/isal/isal_zlib.pyi | 4 ++ src/isal/isal_zlibmodule.c | 123 +++++++++++++++++++++++++++++++++++ tests/test_igzip_threaded.py | 7 +- 4 files changed, 133 insertions(+), 8 deletions(-) diff --git a/src/isal/igzip_threaded.py b/src/isal/igzip_threaded.py index e4cd5fc5..244ee6b3 100644 --- a/src/isal/igzip_threaded.py +++ b/src/isal/igzip_threaded.py @@ -297,11 +297,8 @@ def _compress(self, index: int): return continue try: - compressor = isal_zlib.compressobj( - self.level, wbits=-15, zdict=zdict) - compressed = compressor.compress(data) + compressor.flush( - isal_zlib.Z_SYNC_FLUSH) - crc = isal_zlib.crc32(data) + compressed, crc = isal_zlib._parallel_deflate_and_crc( + data, zdict, self.level) except Exception as e: with self.lock: self.exception = e diff --git a/src/isal/isal_zlib.pyi b/src/isal/isal_zlib.pyi index 53caf837..d2126932 100644 --- a/src/isal/isal_zlib.pyi +++ b/src/isal/isal_zlib.pyi @@ -38,6 +38,10 @@ def adler32(__data, __value: int = 1) -> int: ... def crc32(__data, __value: int = 0) -> int: ... def crc32_combine(__crc1: int, __crc2: int, __crc2_length: int) -> int: ... +def _parallel_deflate_and_crc(__data, __zdict, level + ) -> typing.Tuple[bytes, int]: ... + + def compress(__data, level: int = ISAL_DEFAULT_COMPRESSION, wbits: int = MAX_WBITS) -> bytes: ... diff --git a/src/isal/isal_zlibmodule.c b/src/isal/isal_zlibmodule.c index 35e77fe1..a2fff22b 100644 --- a/src/isal/isal_zlibmodule.c +++ b/src/isal/isal_zlibmodule.c @@ -288,6 +288,128 @@ isal_zlib_crc32_combine(PyObject *module, PyObject *args) { crc32_comb(crc1, crc2, crc2_length) & 0xFFFFFFFF); } +PyDoc_STRVAR(isal_zlib_parallel_deflate_and_crc__doc__, +"parallel_deflate_and_crc($module, data, zdict, /, level)\n" +"--\n" +"\n" +"Function specifically designed for use in parallel compression. Data is \n" +"compressed using deflate and Z_SYNC_FLUSH is used to ensure the block aligns\n" +"to a byte boundary. Also the CRC is calculated. This function is designed to \n" +"maximize the time spent outside the GIL\n" +"\n" +" data\n" +" bytes-like object containing the to be compressed data\n" +" zdict\n" +" last 32 bytes of the previous block\n" +" level\n" +" the compression level to use.\n" +); +#define ISAL_ZLIB_PARALLEL_DEFLATE_AND_CRC_METHODDEF \ + { \ + "_parallel_deflate_and_crc", (PyCFunction)(void (*)(void))isal_zlib_parallel_deflate_and_crc, \ + METH_VARARGS | METH_KEYWORDS, isal_zlib_parallel_deflate_and_crc__doc__ \ + } + +static PyObject * +isal_zlib_parallel_deflate_and_crc(PyObject *module, PyObject *args, PyObject *kwargs) +{ + Py_buffer data; + Py_buffer zdict; + int level = ISAL_DEFAULT_COMPRESSION; + static char *keywords[] = {"", "", "level"}; + static char *format = "y*y*|i:isal_zlib.parallel_deflate_and_crc"; + PyObject *out_bytes = NULL; + uint8_t *level_buf = NULL; + + if (PyArg_ParseTupleAndKeywords( + args, kwargs, format, keywords, &data, &zdict, &level) < 0) { + return NULL; + } + + if (data.len > UINT32_MAX) { + PyErr_Format(PyExc_OverflowError, + "Can only compress %d bytes of data", UINT32_MAX); + goto error; + } + + uint32_t level_buf_size; + if (mem_level_to_bufsize(level, MEM_LEVEL_DEFAULT, &level_buf_size) != 0) { + PyErr_SetString(IsalError, "Invalid compression level"); + goto error; + } + + level_buf = (uint8_t *)PyMem_Malloc(level_buf_size); + if (level_buf == NULL) { + PyErr_NoMemory(); + goto error; + } + // Assume output size < input_size. But just to be sure add 350 safety + // bytes per 64K of input. + Py_ssize_t output_size = data.len + (((data.len >> 16) + 1) * 350); + if (output_size > UINT32_MAX) { + PyErr_SetNone(PyExc_OverflowError); + goto error; + } + out_bytes = PyBytes_FromStringAndSize(NULL, output_size); + if (out_bytes == NULL) { + PyErr_NoMemory(); + goto error; + } + uint8_t *out_ptr = (uint8_t *)PyBytes_AS_STRING(out_bytes); + int err; + struct isal_zstream zst; + isal_deflate_init(&zst); + zst.level = (uint32_t)level; + zst.level_buf = level_buf; + zst.level_buf_size = level_buf_size; + zst.hist_bits = ISAL_DEF_MAX_HIST_BITS; + zst.gzip_flag = IGZIP_DEFLATE; + zst.avail_in = data.len; + zst.next_in = data.buf; + zst.next_out = out_ptr; + zst.avail_out = output_size; + zst.flush = SYNC_FLUSH; + err = isal_deflate_set_dict(&zst, zdict.buf, zdict.len); + if (err != 0){ + isal_deflate_error(err); + return NULL; + } + uint32_t crc; + Py_BEGIN_ALLOW_THREADS + err = isal_deflate(&zst); + crc = crc32_gzip_refl(0, data.buf, data.len); + Py_END_ALLOW_THREADS + + if (err != COMP_OK) { + isal_deflate_error(err); + goto error; + } + if (zst.avail_in != 0) { + PyErr_Format( + PyExc_RuntimeError, + "Developer error input bytes are still available: %u. " + "Please contact the developers by creating an issue at " + "https://github.com/pycompression/python-isal/issues", + zst.avail_in); + goto error; + } + + if (_PyBytes_Resize(&out_bytes, zst.next_out - out_ptr) < 0) { + goto error; + } + PyBuffer_Release(&data); + PyBuffer_Release(&zdict); + PyMem_Free(level_buf); + return Py_BuildValue("(OI)", out_bytes, crc); +error: + PyMem_Free(level_buf); + Py_XDECREF(out_bytes); + PyBuffer_Release(&data); + PyBuffer_Release(&zdict); + return NULL; +} + + PyDoc_STRVAR(zlib_compress__doc__, "compress($module, data, /, level=ISAL_DEFAULT_COMPRESSION, wbits=MAX_WBITS)\n" "--\n" @@ -1972,6 +2094,7 @@ static PyMethodDef IsalZlibMethods[] = { ISAL_ZLIB_ADLER32_METHODDEF, ISAL_ZLIB_CRC32_METHODDEF, ISAL_ZLIB_CRC32_COMBINE_METHODDEF, + ISAL_ZLIB_PARALLEL_DEFLATE_AND_CRC_METHODDEF, ISAL_ZLIB_COMPRESS_METHODDEF, ISAL_ZLIB_DECOMPRESS_METHODDEF, ISAL_ZLIB_COMPRESSOBJ_METHODDEF, diff --git a/tests/test_igzip_threaded.py b/tests/test_igzip_threaded.py index a5c9128d..7ae136e0 100644 --- a/tests/test_igzip_threaded.py +++ b/tests/test_igzip_threaded.py @@ -75,12 +75,13 @@ def test_threaded_read_error(): @pytest.mark.timeout(5) def test_threaded_write_error(monkeypatch): tmp = tempfile.mktemp() - # Compressobj method is called in a worker thread. - monkeypatch.delattr(igzip_threaded.isal_zlib, "compressobj") + # parallel_deflate_and_crc method is called in a worker thread. + monkeypatch.delattr(igzip_threaded.isal_zlib, + "_parallel_deflate_and_crc") with pytest.raises(AttributeError) as error: with igzip_threaded.open(tmp, "wb", compresslevel=3) as writer: writer.write(b"x") - error.match("no attribute 'compressobj'") + error.match("no attribute '_parallel_deflate_and_crc'") def test_close_reader(): From fed1d1d8d038f055f910c6dad0bc3e7c6f5b7020 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 10 Oct 2023 08:59:40 +0200 Subject: [PATCH 03/24] Use a separate object with reusable buffers --- src/isal/igzip_threaded.py | 24 +++-- src/isal/isal_zlib.pyi | 6 +- src/isal/isal_zlibmodule.c | 196 ++++++++++++++++++++++------------- tests/test_igzip_threaded.py | 15 ++- 4 files changed, 151 insertions(+), 90 deletions(-) diff --git a/src/isal/igzip_threaded.py b/src/isal/igzip_threaded.py index 244ee6b3..d135deb2 100644 --- a/src/isal/igzip_threaded.py +++ b/src/isal/igzip_threaded.py @@ -63,8 +63,17 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF, if "r" in mode: gzip_file = io.BufferedReader(_ThreadedGzipReader(binary_file)) else: + write_buffer_size = 1024 * 1024 + # Deflating random data results in an output a little larger than the + # input. Making the output buffer 10% larger is sufficient overkill. + compress_buffer_size = write_buffer_size + max( + write_buffer_size // 10, 500) gzip_file = io.BufferedWriter( - _ThreadedGzipWriter(binary_file, compresslevel, threads), + _ThreadedGzipWriter( + fp=binary_file, + buffer_size=compress_buffer_size, + level=compresslevel, + threads=threads), buffer_size=1024 * 1024 ) if "t" in mode: @@ -182,18 +191,19 @@ class _ThreadedGzipWriter(io.RawIOBase): """ def __init__(self, fp: BinaryIO, + buffer_size: int = 1024 * 1024, level: int = isal_zlib.ISAL_DEFAULT_COMPRESSION, threads: int = 1, queue_size: int = 2): - if level < 0 or level > 3: - raise ValueError( - f"Invalid compression level, " - f"level should be between 0 and 3: {level}") self.lock = threading.Lock() self.exception: Optional[Exception] = None self.raw = fp self.level = level self.previous_block = b"" + self.compressors: List[isal_zlib._ParallelCompress] = [ + isal_zlib._ParallelCompress(buffersize=buffer_size, + level=level) for _ in range(threads) + ] self.input_queues: List[queue.Queue[Tuple[bytes, memoryview]]] = [ queue.Queue(queue_size) for _ in range(threads)] self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [ @@ -289,6 +299,7 @@ def closed(self) -> bool: def _compress(self, index: int): in_queue = self.input_queues[index] out_queue = self.output_queues[index] + compressor: isal_zlib._ParallelCompress = self.compressors[index] while True: try: data, zdict = in_queue.get(timeout=0.05) @@ -297,8 +308,7 @@ def _compress(self, index: int): return continue try: - compressed, crc = isal_zlib._parallel_deflate_and_crc( - data, zdict, self.level) + compressed, crc = compressor.compress_and_crc(data, zdict) except Exception as e: with self.lock: self.exception = e diff --git a/src/isal/isal_zlib.pyi b/src/isal/isal_zlib.pyi index d2126932..9ed725a2 100644 --- a/src/isal/isal_zlib.pyi +++ b/src/isal/isal_zlib.pyi @@ -38,9 +38,9 @@ def adler32(__data, __value: int = 1) -> int: ... def crc32(__data, __value: int = 0) -> int: ... def crc32_combine(__crc1: int, __crc2: int, __crc2_length: int) -> int: ... -def _parallel_deflate_and_crc(__data, __zdict, level - ) -> typing.Tuple[bytes, int]: ... - +class _ParallelCompress: + def __init__(self, buffersize: int, level: int): ... + def compress_and_crc(self, __data, __zdict) -> typing.Tuple[bytes, int]: ... def compress(__data, level: int = ISAL_DEFAULT_COMPRESSION, diff --git a/src/isal/isal_zlibmodule.c b/src/isal/isal_zlibmodule.c index a2fff22b..3bef9b77 100644 --- a/src/isal/isal_zlibmodule.c +++ b/src/isal/isal_zlibmodule.c @@ -288,8 +288,75 @@ isal_zlib_crc32_combine(PyObject *module, PyObject *args) { crc32_comb(crc1, crc2, crc2_length) & 0xFFFFFFFF); } -PyDoc_STRVAR(isal_zlib_parallel_deflate_and_crc__doc__, -"parallel_deflate_and_crc($module, data, zdict, /, level)\n" + +typedef struct { + PyObject_HEAD + uint8_t *buffer; + uint32_t buffer_size; + struct isal_zstream zst; +} ParallelCompress; + +static void +ParallelCompress_dealloc(ParallelCompress *self) +{ + PyMem_Free(self->buffer); + PyMem_Free(self->zst.level_buf); + Py_TYPE(self)->tp_free((PyObject *)self); +} + +static PyObject * +ParallelCompress__new__(PyTypeObject *type, PyObject *args, PyObject *kwargs) +{ + Py_ssize_t buffer_size = 0; + int level = ISAL_DEFAULT_COMPRESSION; + static char *format = "n|i:ParallelCompress__new__"; + static char *kwarg_names[] = {"buffersize", "level", NULL}; + if (PyArg_ParseTupleAndKeywords(args, kwargs, format, kwarg_names, + &buffer_size, &level) < 0) { + return NULL; + } + uint32_t level_buf_size; + if (mem_level_to_bufsize(level, MEM_LEVEL_DEFAULT, &level_buf_size) < 0) { + PyErr_Format(PyExc_ValueError, "Invalid compression level %d", level); + return NULL; + } + if (buffer_size > UINT32_MAX) { + PyErr_Format(PyExc_ValueError, + "buffersize must be at most %zd, got %zd", + (Py_ssize_t)UINT32_MAX, buffer_size); + } + ParallelCompress *self = PyObject_New(ParallelCompress, type); + if (self == NULL) { + return PyErr_NoMemory(); + } + self->buffer = NULL; + self->zst.level_buf = NULL; + isal_deflate_init(&self->zst); + uint8_t *level_buf = PyMem_Malloc(level_buf_size); + if (level_buf == NULL) { + Py_DECREF(self); + return PyErr_NoMemory(); + } + uint8_t *buffer = PyMem_Malloc(buffer_size); + if (buffer == NULL) { + Py_DECREF(self); + PyMem_Free(level_buf); + return PyErr_NoMemory(); + } + self->buffer = buffer; + self->buffer_size = buffer_size; + self->zst.level_buf = level_buf; + self->zst.level_buf_size = level_buf_size; + self->zst.gzip_flag = IGZIP_DEFLATE; + self->zst.hist_bits = ISAL_DEF_MAX_HIST_BITS; + self->zst.level = (uint32_t)level; + self->zst.flush = SYNC_FLUSH; + return (PyObject *)self; +} + + +PyDoc_STRVAR(ParallelCompress_compress_and_crc__doc__, +"compress_and_crc($self, data, zdict, /)\n" "--\n" "\n" "Function specifically designed for use in parallel compression. Data is \n" @@ -301,28 +368,20 @@ PyDoc_STRVAR(isal_zlib_parallel_deflate_and_crc__doc__, " bytes-like object containing the to be compressed data\n" " zdict\n" " last 32 bytes of the previous block\n" -" level\n" -" the compression level to use.\n" ); -#define ISAL_ZLIB_PARALLEL_DEFLATE_AND_CRC_METHODDEF \ - { \ - "_parallel_deflate_and_crc", (PyCFunction)(void (*)(void))isal_zlib_parallel_deflate_and_crc, \ - METH_VARARGS | METH_KEYWORDS, isal_zlib_parallel_deflate_and_crc__doc__ \ - } +#define PARALLELCOMPRESS_COMPRESS_AND_CRC_METHODDEF \ + { \ + "compress_and_crc", (PyCFunction)ParallelCompress_compress_and_crc, \ + METH_VARARGS, ParallelCompress_compress_and_crc__doc__} static PyObject * -isal_zlib_parallel_deflate_and_crc(PyObject *module, PyObject *args, PyObject *kwargs) +ParallelCompress_compress_and_crc(ParallelCompress *self, PyObject *args) { Py_buffer data; Py_buffer zdict; - int level = ISAL_DEFAULT_COMPRESSION; - static char *keywords[] = {"", "", "level"}; - static char *format = "y*y*|i:isal_zlib.parallel_deflate_and_crc"; - PyObject *out_bytes = NULL; - uint8_t *level_buf = NULL; + static char *format = "y*y*:compress_and_crc"; - if (PyArg_ParseTupleAndKeywords( - args, kwargs, format, keywords, &data, &zdict, &level) < 0) { + if (PyArg_ParseTuple(args, format, &data, &zdict) < 0) { return NULL; } @@ -331,84 +390,69 @@ isal_zlib_parallel_deflate_and_crc(PyObject *module, PyObject *args, PyObject *k "Can only compress %d bytes of data", UINT32_MAX); goto error; } - - uint32_t level_buf_size; - if (mem_level_to_bufsize(level, MEM_LEVEL_DEFAULT, &level_buf_size) != 0) { - PyErr_SetString(IsalError, "Invalid compression level"); - goto error; - } - - level_buf = (uint8_t *)PyMem_Malloc(level_buf_size); - if (level_buf == NULL) { - PyErr_NoMemory(); - goto error; - } - // Assume output size < input_size. But just to be sure add 350 safety - // bytes per 64K of input. - Py_ssize_t output_size = data.len + (((data.len >> 16) + 1) * 350); - if (output_size > UINT32_MAX) { - PyErr_SetNone(PyExc_OverflowError); - goto error; - } - out_bytes = PyBytes_FromStringAndSize(NULL, output_size); - if (out_bytes == NULL) { - PyErr_NoMemory(); - goto error; - } - uint8_t *out_ptr = (uint8_t *)PyBytes_AS_STRING(out_bytes); - int err; - struct isal_zstream zst; - isal_deflate_init(&zst); - zst.level = (uint32_t)level; - zst.level_buf = level_buf; - zst.level_buf_size = level_buf_size; - zst.hist_bits = ISAL_DEF_MAX_HIST_BITS; - zst.gzip_flag = IGZIP_DEFLATE; - zst.avail_in = data.len; - zst.next_in = data.buf; - zst.next_out = out_ptr; - zst.avail_out = output_size; - zst.flush = SYNC_FLUSH; - err = isal_deflate_set_dict(&zst, zdict.buf, zdict.len); + isal_deflate_reset(&self->zst); + self->zst.avail_in = data.len; + self->zst.next_in = data.buf; + self->zst.next_out = self->buffer; + self->zst.avail_out = self->buffer_size; + PyThreadState *_save; + Py_UNBLOCK_THREADS + int err = isal_deflate_set_dict(&self->zst, zdict.buf, zdict.len); if (err != 0){ + Py_BLOCK_THREADS; isal_deflate_error(err); - return NULL; + goto error; } - uint32_t crc; - Py_BEGIN_ALLOW_THREADS - err = isal_deflate(&zst); - crc = crc32_gzip_refl(0, data.buf, data.len); - Py_END_ALLOW_THREADS + err = isal_deflate(&self->zst); + uint32_t crc = crc32_gzip_refl(0, data.buf, data.len); + Py_BLOCK_THREADS; if (err != COMP_OK) { isal_deflate_error(err); goto error; } - if (zst.avail_in != 0) { + if (self->zst.avail_out == 0) { + PyErr_Format( + PyExc_OverflowError, + "Compressed output exceeds buffer size of %u", self->buffer_size + ); + goto error; + } + if (self->zst.avail_in != 0) { PyErr_Format( PyExc_RuntimeError, "Developer error input bytes are still available: %u. " "Please contact the developers by creating an issue at " "https://github.com/pycompression/python-isal/issues", - zst.avail_in); + self->zst.avail_in); goto error; } - - if (_PyBytes_Resize(&out_bytes, zst.next_out - out_ptr) < 0) { + PyObject *out_bytes = PyBytes_FromStringAndSize( + (char *)self->buffer, self->zst.next_out - self->buffer); + if (out_bytes == NULL) { goto error; } - PyBuffer_Release(&data); - PyBuffer_Release(&zdict); - PyMem_Free(level_buf); return Py_BuildValue("(OI)", out_bytes, crc); error: - PyMem_Free(level_buf); - Py_XDECREF(out_bytes); PyBuffer_Release(&data); PyBuffer_Release(&zdict); return NULL; } +static PyMethodDef ParallelCompress_methods[] = { + PARALLELCOMPRESS_COMPRESS_AND_CRC_METHODDEF, + {NULL}, +}; + +static PyTypeObject ParallelCompress_Type = { + .tp_name = "isal_zlib._ParallelCompress", + .tp_basicsize = sizeof(ParallelCompress), + .tp_doc = PyDoc_STR( + "A reusable zstream and buffer fast parallel compression."), + .tp_dealloc = (destructor)ParallelCompress_dealloc, + .tp_new = ParallelCompress__new__, + .tp_methods = ParallelCompress_methods, +}; PyDoc_STRVAR(zlib_compress__doc__, "compress($module, data, /, level=ISAL_DEFAULT_COMPRESSION, wbits=MAX_WBITS)\n" @@ -2094,7 +2138,6 @@ static PyMethodDef IsalZlibMethods[] = { ISAL_ZLIB_ADLER32_METHODDEF, ISAL_ZLIB_CRC32_METHODDEF, ISAL_ZLIB_CRC32_COMBINE_METHODDEF, - ISAL_ZLIB_PARALLEL_DEFLATE_AND_CRC_METHODDEF, ISAL_ZLIB_COMPRESS_METHODDEF, ISAL_ZLIB_DECOMPRESS_METHODDEF, ISAL_ZLIB_COMPRESSOBJ_METHODDEF, @@ -2178,6 +2221,15 @@ PyInit_isal_zlib(void) return NULL; } + if (PyType_Ready(&ParallelCompress_Type) != 0) { + return NULL; + } + Py_INCREF(&ParallelCompress_Type); + if (PyModule_AddObject(m, "_ParallelCompress", + (PyObject *)&ParallelCompress_Type) < 0) { + return NULL; + } + PyModule_AddIntConstant(m, "MAX_WBITS", ISAL_DEF_MAX_HIST_BITS); PyModule_AddIntConstant(m, "DEFLATED", Z_DEFLATED); PyModule_AddIntMacro(m, DEF_MEM_LEVEL); diff --git a/tests/test_igzip_threaded.py b/tests/test_igzip_threaded.py index 7ae136e0..8a9390e8 100644 --- a/tests/test_igzip_threaded.py +++ b/tests/test_igzip_threaded.py @@ -7,6 +7,7 @@ import gzip import io +import random import tempfile from pathlib import Path @@ -73,15 +74,13 @@ def test_threaded_read_error(): @pytest.mark.timeout(5) -def test_threaded_write_error(monkeypatch): - tmp = tempfile.mktemp() +def test_threaded_write_error(): # parallel_deflate_and_crc method is called in a worker thread. - monkeypatch.delattr(igzip_threaded.isal_zlib, - "_parallel_deflate_and_crc") - with pytest.raises(AttributeError) as error: - with igzip_threaded.open(tmp, "wb", compresslevel=3) as writer: - writer.write(b"x") - error.match("no attribute '_parallel_deflate_and_crc'") + with pytest.raises(OverflowError) as error: + with igzip_threaded.open( + io.BytesIO(), "wb", compresslevel=3) as writer: + writer.write(random.randbytes(1024 * 1024 * 50)) + error.match("Compressed output exceeds buffer size") def test_close_reader(): From 048cc1d17cc29f26a8eb2bb8e08a82a380ce588c Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 10 Oct 2023 11:57:03 +0200 Subject: [PATCH 04/24] Fix memory leak --- src/isal/isal_zlibmodule.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/isal/isal_zlibmodule.c b/src/isal/isal_zlibmodule.c index 3bef9b77..7588b1af 100644 --- a/src/isal/isal_zlibmodule.c +++ b/src/isal/isal_zlibmodule.c @@ -432,7 +432,11 @@ ParallelCompress_compress_and_crc(ParallelCompress *self, PyObject *args) if (out_bytes == NULL) { goto error; } - return Py_BuildValue("(OI)", out_bytes, crc); + PyBuffer_Release(&data); + PyBuffer_Release(&zdict); + PyObject *ret= Py_BuildValue("(OI)", out_bytes, crc); + Py_DECREF(out_bytes); + return ret; error: PyBuffer_Release(&data); PyBuffer_Release(&zdict); From 795e79cb35094e258f9e38f468c22408c3339e78 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 10 Oct 2023 12:02:52 +0200 Subject: [PATCH 05/24] Calculate crc on the fly --- src/isal/isal_zlibmodule.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/isal/isal_zlibmodule.c b/src/isal/isal_zlibmodule.c index 7588b1af..db5724a3 100644 --- a/src/isal/isal_zlibmodule.c +++ b/src/isal/isal_zlibmodule.c @@ -347,7 +347,7 @@ ParallelCompress__new__(PyTypeObject *type, PyObject *args, PyObject *kwargs) self->buffer_size = buffer_size; self->zst.level_buf = level_buf; self->zst.level_buf_size = level_buf_size; - self->zst.gzip_flag = IGZIP_DEFLATE; + self->zst.gzip_flag = IGZIP_GZIP_NO_HDR; self->zst.hist_bits = ISAL_DEF_MAX_HIST_BITS; self->zst.level = (uint32_t)level; self->zst.flush = SYNC_FLUSH; @@ -404,7 +404,6 @@ ParallelCompress_compress_and_crc(ParallelCompress *self, PyObject *args) goto error; } err = isal_deflate(&self->zst); - uint32_t crc = crc32_gzip_refl(0, data.buf, data.len); Py_BLOCK_THREADS; if (err != COMP_OK) { @@ -434,7 +433,7 @@ ParallelCompress_compress_and_crc(ParallelCompress *self, PyObject *args) } PyBuffer_Release(&data); PyBuffer_Release(&zdict); - PyObject *ret= Py_BuildValue("(OI)", out_bytes, crc); + PyObject *ret= Py_BuildValue("(OI)", out_bytes, self->zst.internal_state.crc); Py_DECREF(out_bytes); return ret; error: From 0214e1224b85e189d894a5abea1a04d9947c8452 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 10 Oct 2023 12:12:08 +0200 Subject: [PATCH 06/24] Make sure writebuffersize is set --- src/isal/igzip_threaded.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/isal/igzip_threaded.py b/src/isal/igzip_threaded.py index d135deb2..5529f851 100644 --- a/src/isal/igzip_threaded.py +++ b/src/isal/igzip_threaded.py @@ -74,7 +74,7 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF, buffer_size=compress_buffer_size, level=compresslevel, threads=threads), - buffer_size=1024 * 1024 + buffer_size=write_buffer_size ) if "t" in mode: return io.TextIOWrapper(gzip_file, encoding, errors, newline) From 1278c7b09c80c9459042ae0cf178414cc468a1b8 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 10 Oct 2023 15:30:53 +0200 Subject: [PATCH 07/24] Read smaller blocks and store less blocks in queue to reduce memory size for reading --- src/isal/igzip_threaded.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/isal/igzip_threaded.py b/src/isal/igzip_threaded.py index 5529f851..39a3923e 100644 --- a/src/isal/igzip_threaded.py +++ b/src/isal/igzip_threaded.py @@ -82,7 +82,7 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF, class _ThreadedGzipReader(io.RawIOBase): - def __init__(self, fp, queue_size=4, block_size=8 * 1024 * 1024): + def __init__(self, fp, queue_size=2, block_size=1024 * 1024): self.raw = fp self.fileobj = igzip._IGzipReader(fp, buffersize=8 * 1024 * 1024) self.pos = 0 From 49ab67513598dc244e42458b760645e2a310d4e4 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 10 Oct 2023 15:34:35 +0200 Subject: [PATCH 08/24] Make block size configurable --- src/isal/igzip_threaded.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/isal/igzip_threaded.py b/src/isal/igzip_threaded.py index 39a3923e..a37163f6 100644 --- a/src/isal/igzip_threaded.py +++ b/src/isal/igzip_threaded.py @@ -20,7 +20,8 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF, - encoding=None, errors=None, newline=None, *, threads=1): + encoding=None, errors=None, newline=None, *, threads=1, + block_size=1024 * 1024): """ Utilize threads to read and write gzip objects and escape the GIL. Comparable to gzip.open. This method is only usable for streamed reading @@ -39,6 +40,8 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF, :param threads: If 0 will defer to igzip.open, if < 0 will use all threads available to the system. Reading gzip can only use one thread. + :param block_size: Determines how large the blocks in the read/write + queues are for threaded reading and writing. :return: An io.BufferedReader, io.BufferedWriter, or io.TextIOWrapper, depending on the mode. """ @@ -61,20 +64,21 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF, else: raise TypeError("filename must be a str or bytes object, or a file") if "r" in mode: - gzip_file = io.BufferedReader(_ThreadedGzipReader(binary_file)) + gzip_file = io.BufferedReader( + _ThreadedGzipReader(binary_file, block_size=block_size)) else: - write_buffer_size = 1024 * 1024 # Deflating random data results in an output a little larger than the # input. Making the output buffer 10% larger is sufficient overkill. - compress_buffer_size = write_buffer_size + max( - write_buffer_size // 10, 500) + compress_buffer_size = block_size + max( + block_size // 10, 500) gzip_file = io.BufferedWriter( _ThreadedGzipWriter( fp=binary_file, buffer_size=compress_buffer_size, level=compresslevel, - threads=threads), - buffer_size=write_buffer_size + threads=threads + ), + buffer_size=block_size ) if "t" in mode: return io.TextIOWrapper(gzip_file, encoding, errors, newline) @@ -84,7 +88,7 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF, class _ThreadedGzipReader(io.RawIOBase): def __init__(self, fp, queue_size=2, block_size=1024 * 1024): self.raw = fp - self.fileobj = igzip._IGzipReader(fp, buffersize=8 * 1024 * 1024) + self.fileobj = igzip._IGzipReader(fp, buffersize=8 * block_size) self.pos = 0 self.read_file = False self.queue = queue.Queue(queue_size) From dff95a352a62774dcc23d6dfa168b79ab9465ee5 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 10 Oct 2023 18:49:48 +0200 Subject: [PATCH 09/24] Use a smaller queue for better characteristics --- src/isal/igzip_threaded.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/isal/igzip_threaded.py b/src/isal/igzip_threaded.py index a37163f6..75252e28 100644 --- a/src/isal/igzip_threaded.py +++ b/src/isal/igzip_threaded.py @@ -198,7 +198,7 @@ def __init__(self, buffer_size: int = 1024 * 1024, level: int = isal_zlib.ISAL_DEFAULT_COMPRESSION, threads: int = 1, - queue_size: int = 2): + queue_size: int = 1): self.lock = threading.Lock() self.exception: Optional[Exception] = None self.raw = fp From 424a5b91b7dec5c0b69ad7b964e9f5413c9d8029 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 11 Oct 2023 07:36:08 +0200 Subject: [PATCH 10/24] Combine queues and threads for threads equals 1, for greater efficiency --- src/isal/igzip_threaded.py | 85 ++++++++++++++++++++++++++++---------- 1 file changed, 64 insertions(+), 21 deletions(-) diff --git a/src/isal/igzip_threaded.py b/src/isal/igzip_threaded.py index 75252e28..285cc737 100644 --- a/src/isal/igzip_threaded.py +++ b/src/isal/igzip_threaded.py @@ -192,6 +192,9 @@ class _ThreadedGzipWriter(io.RawIOBase): The writer thread reads from output queues and uses the crc32_combine function to calculate the total crc. It also writes the compressed block. + + When only one thread is requested, only the input queue is used and + compressing and output is handled in one thread. """ def __init__(self, fp: BinaryIO, @@ -208,20 +211,29 @@ def __init__(self, isal_zlib._ParallelCompress(buffersize=buffer_size, level=level) for _ in range(threads) ] - self.input_queues: List[queue.Queue[Tuple[bytes, memoryview]]] = [ - queue.Queue(queue_size) for _ in range(threads)] - self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [ - queue.Queue(queue_size) for _ in range(threads)] - self.index = 0 + if threads > 1: + self.input_queues: List[queue.Queue[Tuple[bytes, memoryview]]] = [ + queue.Queue(queue_size) for _ in range(threads)] + self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [ + queue.Queue(queue_size) for _ in range(threads)] + self.output_worker = threading.Thread(target=self._write) + self.compression_workers = [ + threading.Thread(target=self._compress, args=(i,)) + for i in range(threads) + ] + elif threads == 1: + self.input_queues = [queue.Queue(queue_size)] + self.output_queues = [] + self.compression_workers = [] + self.output_worker = threading.Thread( + target=self._compress_and_write) + else: + raise ValueError(f"threads should be 1 or greater, got {threads}") self.threads = threads + self.index = 0 self._crc = 0 self.running = False self._size = 0 - self.output_worker = threading.Thread(target=self._write) - self.compression_workers = [ - threading.Thread(target=self._compress, args=(i,)) - for i in range(threads) - ] self._closed = False self._write_gzip_header() self.start() @@ -314,17 +326,9 @@ def _compress(self, index: int): try: compressed, crc = compressor.compress_and_crc(data, zdict) except Exception as e: - with self.lock: - self.exception = e - # Abort everything and empty the queue - in_queue.task_done() - self.running = False - while True: - try: - _ = in_queue.get(timeout=0.05) - in_queue.task_done() - except queue.Empty: - return + in_queue.task_done() + self._set_error_and_empty_queue(e, in_queue) + return data_length = len(data) out_queue.put((compressed, crc, data_length)) in_queue.task_done() @@ -352,5 +356,44 @@ def _write(self): output_queue.task_done() index += 1 + def _compress_and_write(self): + if not self.threads == 1: + raise SystemError("Compress_and_write is for one thread only") + fp = self.raw + total_crc = 0 + size = 0 + in_queue = self.input_queues[0] + compressor = self.compressors[0] + while True: + try: + data, zdict = in_queue.get(timeout=0.05) + except queue.Empty: + if not self.running: + return + continue + try: + compressed, crc = compressor.compress_and_crc(data, zdict) + except Exception as e: + in_queue.task_done() + self._set_error_and_empty_queue(e, in_queue) + return + data_length = len(data) + total_crc = isal_zlib.crc32_combine(total_crc, crc, data_length) + size += data_length + fp.write(compressed) + in_queue.task_done() + + def _set_error_and_empty_queue(self, error, q): + with self.lock: + self.exception = error + # Abort everything and empty the queue + self.running = False + while True: + try: + _ = q.get(timeout=0.05) + q.task_done() + except queue.Empty: + return + def writable(self) -> bool: return True From 1509a7ab11c836f16e1be3fd6f2a7c0b0ffdefcf Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 11 Oct 2023 07:49:25 +0200 Subject: [PATCH 11/24] Parametrize tests for threaded writer so both cases are tested --- src/isal/igzip_threaded.py | 4 +++- tests/test_igzip_threaded.py | 32 +++++++++++++++++++++++--------- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/src/isal/igzip_threaded.py b/src/isal/igzip_threaded.py index 285cc737..f8807ff6 100644 --- a/src/isal/igzip_threaded.py +++ b/src/isal/igzip_threaded.py @@ -228,7 +228,7 @@ def __init__(self, self.output_worker = threading.Thread( target=self._compress_and_write) else: - raise ValueError(f"threads should be 1 or greater, got {threads}") + raise ValueError(f"threads should be at least 1, got {threads}") self.threads = threads self.index = 0 self._crc = 0 @@ -369,6 +369,8 @@ def _compress_and_write(self): data, zdict = in_queue.get(timeout=0.05) except queue.Empty: if not self.running: + self._crc = total_crc + self._size = size return continue try: diff --git a/tests/test_igzip_threaded.py b/tests/test_igzip_threaded.py index 8a9390e8..d36b19f7 100644 --- a/tests/test_igzip_threaded.py +++ b/tests/test_igzip_threaded.py @@ -7,6 +7,7 @@ import gzip import io +import itertools import random import tempfile from pathlib import Path @@ -26,10 +27,11 @@ def test_threaded_read(): assert thread_data == data -@pytest.mark.parametrize("mode", ["wb", "wt"]) -def test_threaded_write(mode): +@pytest.mark.parametrize(["mode", "threads"], + itertools.product(["wb", "wt"], [1, 3])) +def test_threaded_write(mode, threads): with tempfile.NamedTemporaryFile("wb", delete=False) as tmp: - with igzip_threaded.open(tmp, mode, threads=-1) as out_file: + with igzip_threaded.open(tmp, mode, threads=threads) as out_file: gzip_open_mode = "rb" if "b" in mode else "rt" with gzip.open(TEST_FILE, gzip_open_mode) as in_file: while True: @@ -74,11 +76,13 @@ def test_threaded_read_error(): @pytest.mark.timeout(5) -def test_threaded_write_error(): +@pytest.mark.parametrize("threads", [1, 3]) +def test_threaded_write_error(threads): # parallel_deflate_and_crc method is called in a worker thread. with pytest.raises(OverflowError) as error: with igzip_threaded.open( - io.BytesIO(), "wb", compresslevel=3) as writer: + io.BytesIO(), "wb", compresslevel=3, threads=threads + ) as writer: writer.write(random.randbytes(1024 * 1024 * 50)) error.match("Compressed output exceeds buffer size") @@ -92,8 +96,10 @@ def test_close_reader(): f.close() -def test_close_writer(): - f = igzip_threaded._ThreadedGzipWriter(io.BytesIO()) +@pytest.mark.parametrize("threads", [1, 3]) +def test_close_writer(threads): + f = igzip_threaded._ThreadedGzipWriter( + io.BytesIO(), threads=threads) f.close() assert f.closed # Make sure double closing does not raise errors @@ -117,6 +123,13 @@ def test_writer_wrong_level(): error.match("42") +def test_writer_too_low_threads(): + with pytest.raises(ValueError) as error: + igzip_threaded._ThreadedGzipWriter(io.BytesIO(), threads=0) + error.match("threads") + error.match("at least 1") + + def test_reader_read_after_close(): with open(TEST_FILE, "rb") as test_f: f = igzip_threaded._ThreadedGzipReader(test_f) @@ -126,8 +139,9 @@ def test_reader_read_after_close(): error.match("closed") -def test_writer_write_after_close(): - f = igzip_threaded._ThreadedGzipWriter(io.BytesIO()) +@pytest.mark.parametrize("threads", [1, 3]) +def test_writer_write_after_close(threads): + f = igzip_threaded._ThreadedGzipWriter(io.BytesIO(), threads=threads) f.close() with pytest.raises(ValueError) as error: f.write(b"abc") From 1107f475aee93ee2795f757f39cd715ad50908bf Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 11 Oct 2023 08:43:14 +0200 Subject: [PATCH 12/24] Build tuple directly rather than using Py_BuildValue --- src/isal/isal_zlibmodule.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/isal/isal_zlibmodule.c b/src/isal/isal_zlibmodule.c index db5724a3..f470b4cf 100644 --- a/src/isal/isal_zlibmodule.c +++ b/src/isal/isal_zlibmodule.c @@ -426,16 +426,19 @@ ParallelCompress_compress_and_crc(ParallelCompress *self, PyObject *args) self->zst.avail_in); goto error; } + PyObject *out_tup = PyTuple_New(2); + PyObject *crc_obj = PyLong_FromUnsignedLong(self->zst.internal_state.crc); PyObject *out_bytes = PyBytes_FromStringAndSize( (char *)self->buffer, self->zst.next_out - self->buffer); - if (out_bytes == NULL) { + if (out_bytes == NULL || out_tup == NULL || crc_obj == NULL) { + Py_XDECREF(out_bytes); Py_XDECREF(out_tup); Py_XDECREF(crc_obj); goto error; } PyBuffer_Release(&data); PyBuffer_Release(&zdict); - PyObject *ret= Py_BuildValue("(OI)", out_bytes, self->zst.internal_state.crc); - Py_DECREF(out_bytes); - return ret; + PyTuple_SET_ITEM(out_tup, 0, out_bytes); + PyTuple_SET_ITEM(out_tup, 1, crc_obj); + return out_tup; error: PyBuffer_Release(&data); PyBuffer_Release(&zdict); From 550e010a95154e6a015e29813fbf8b21dbc9c4f5 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 11 Oct 2023 08:48:25 +0200 Subject: [PATCH 13/24] Use meth_fastcall for compress_and_crc --- src/isal/isal_zlibmodule.c | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/isal/isal_zlibmodule.c b/src/isal/isal_zlibmodule.c index f470b4cf..080c1012 100644 --- a/src/isal/isal_zlibmodule.c +++ b/src/isal/isal_zlibmodule.c @@ -372,16 +372,27 @@ PyDoc_STRVAR(ParallelCompress_compress_and_crc__doc__, #define PARALLELCOMPRESS_COMPRESS_AND_CRC_METHODDEF \ { \ "compress_and_crc", (PyCFunction)ParallelCompress_compress_and_crc, \ - METH_VARARGS, ParallelCompress_compress_and_crc__doc__} + METH_FASTCALL, ParallelCompress_compress_and_crc__doc__} static PyObject * -ParallelCompress_compress_and_crc(ParallelCompress *self, PyObject *args) +ParallelCompress_compress_and_crc(ParallelCompress *self, + PyObject *const *args, + Py_ssize_t nargs) { + if (nargs != 2) { + PyErr_Format( + PyExc_TypeError, + "compress_and_crc takes exactly 2 arguments, got %zd", + nargs); + return NULL; + } Py_buffer data; Py_buffer zdict; - static char *format = "y*y*:compress_and_crc"; - - if (PyArg_ParseTuple(args, format, &data, &zdict) < 0) { + if (PyObject_GetBuffer(args[0], &data, PyBUF_SIMPLE) == -1) { + return NULL; + } + if (PyObject_GetBuffer(args[1], &zdict, PyBUF_SIMPLE) == -1) { + PyBuffer_Release(&data); return NULL; } From 59620faeebab730dbe9c9e0d333cb82e14f84583 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 11 Oct 2023 08:48:49 +0200 Subject: [PATCH 14/24] Make sure zdict cannot be excessively large --- src/isal/isal_zlibmodule.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/isal/isal_zlibmodule.c b/src/isal/isal_zlibmodule.c index 080c1012..2ee10bda 100644 --- a/src/isal/isal_zlibmodule.c +++ b/src/isal/isal_zlibmodule.c @@ -396,7 +396,7 @@ ParallelCompress_compress_and_crc(ParallelCompress *self, return NULL; } - if (data.len > UINT32_MAX) { + if (data.len + zdict.len > UINT32_MAX) { PyErr_Format(PyExc_OverflowError, "Can only compress %d bytes of data", UINT32_MAX); goto error; From 0d1cd1003d180176e8b0254891fe28a7e1a3f50d Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 11 Oct 2023 08:49:35 +0200 Subject: [PATCH 15/24] Unblock threads at the earliest possible convenience --- src/isal/isal_zlibmodule.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/isal/isal_zlibmodule.c b/src/isal/isal_zlibmodule.c index 2ee10bda..33f796bf 100644 --- a/src/isal/isal_zlibmodule.c +++ b/src/isal/isal_zlibmodule.c @@ -401,13 +401,13 @@ ParallelCompress_compress_and_crc(ParallelCompress *self, "Can only compress %d bytes of data", UINT32_MAX); goto error; } + PyThreadState *_save; + Py_UNBLOCK_THREADS isal_deflate_reset(&self->zst); self->zst.avail_in = data.len; self->zst.next_in = data.buf; self->zst.next_out = self->buffer; self->zst.avail_out = self->buffer_size; - PyThreadState *_save; - Py_UNBLOCK_THREADS int err = isal_deflate_set_dict(&self->zst, zdict.buf, zdict.len); if (err != 0){ Py_BLOCK_THREADS; From 8327101531016d030c673b72bfe627a1c96b0bf9 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 11 Oct 2023 08:52:20 +0200 Subject: [PATCH 16/24] Update changelog with changes for threaded writing --- CHANGELOG.rst | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index b042ead1..fac4ed78 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,6 +7,14 @@ Changelog .. This document is user facing. Please word the changes in such a way .. that users understand how the changes affect the new version. +version 1.5.0-dev +----------------- ++ Make a special case for threads==1 in ``igzip_threaded.open`` for writing + files. This now combines the writing and compression thread for less + overhead. ++ Write a specialized function for compressing blocks in a threaded fashion. + This function maximizes time spent outside the GIL. + version 1.4.1 ----------------- + Fix several errors related to unclosed files and buffers. From 0226aa0fdd78caa911f43ad6f9502b587a3fd48f Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 11 Oct 2023 09:14:56 +0200 Subject: [PATCH 17/24] Set buffer_size at latest position for backwards compatibility --- src/isal/igzip_threaded.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/isal/igzip_threaded.py b/src/isal/igzip_threaded.py index f8807ff6..7a3d24af 100644 --- a/src/isal/igzip_threaded.py +++ b/src/isal/igzip_threaded.py @@ -198,10 +198,11 @@ class _ThreadedGzipWriter(io.RawIOBase): """ def __init__(self, fp: BinaryIO, - buffer_size: int = 1024 * 1024, level: int = isal_zlib.ISAL_DEFAULT_COMPRESSION, threads: int = 1, - queue_size: int = 1): + queue_size: int = 1, + buffer_size: int = 1024 * 1024, + ): self.lock = threading.Lock() self.exception: Optional[Exception] = None self.raw = fp From e8fd360462985439253e47d6e3315a7be6c05007 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 11 Oct 2023 09:17:24 +0200 Subject: [PATCH 18/24] use os.urandom rather than random.randbytes --- tests/test_igzip_threaded.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_igzip_threaded.py b/tests/test_igzip_threaded.py index d36b19f7..d6430dc8 100644 --- a/tests/test_igzip_threaded.py +++ b/tests/test_igzip_threaded.py @@ -8,7 +8,7 @@ import gzip import io import itertools -import random +import os import tempfile from pathlib import Path @@ -83,7 +83,7 @@ def test_threaded_write_error(threads): with igzip_threaded.open( io.BytesIO(), "wb", compresslevel=3, threads=threads ) as writer: - writer.write(random.randbytes(1024 * 1024 * 50)) + writer.write(os.urandom(1024 * 1024 * 50)) error.match("Compressed output exceeds buffer size") From 39d31e0c48064750be91a81171195ed870f9d9ef Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 11 Oct 2023 21:02:36 +0200 Subject: [PATCH 19/24] Add tests for oversized blocks --- tests/test_igzip_threaded.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/tests/test_igzip_threaded.py b/tests/test_igzip_threaded.py index d6430dc8..e85d1101 100644 --- a/tests/test_igzip_threaded.py +++ b/tests/test_igzip_threaded.py @@ -75,15 +75,26 @@ def test_threaded_read_error(): tr_f.read() +@pytest.mark.timeout(5) +@pytest.mark.parametrize("threads", [1, 3]) +def test_threaded_write_oversized_block_no_error(threads): + with igzip_threaded.open( + io.BytesIO(), "wb", compresslevel=3, threads=threads, + block_size=8 * 1024 + ) as writer: + writer.write(os.urandom(1024 * 64)) + + @pytest.mark.timeout(5) @pytest.mark.parametrize("threads", [1, 3]) def test_threaded_write_error(threads): - # parallel_deflate_and_crc method is called in a worker thread. + f = igzip_threaded._ThreadedGzipWriter( + fp=io.BytesIO(), level=3, + threads=threads, buffer_size=8 * 1024) + # Bypass the write method which should not allow this. + f.input_queues[0].put((os.urandom(1024 * 64), b"")) with pytest.raises(OverflowError) as error: - with igzip_threaded.open( - io.BytesIO(), "wb", compresslevel=3, threads=threads - ) as writer: - writer.write(os.urandom(1024 * 1024 * 50)) + f.close() error.match("Compressed output exceeds buffer size") From c80645d6b2ab72e0bdb9207ccdd08fa998ab1606 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 11 Oct 2023 21:40:24 +0200 Subject: [PATCH 20/24] Test if too big blocks get written correctly --- src/isal/igzip_threaded.py | 27 +++++++++++++++++++-------- tests/test_igzip_threaded.py | 23 ++++++++++++++++------- 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/src/isal/igzip_threaded.py b/src/isal/igzip_threaded.py index 7a3d24af..20142e7a 100644 --- a/src/isal/igzip_threaded.py +++ b/src/isal/igzip_threaded.py @@ -67,14 +67,10 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF, gzip_file = io.BufferedReader( _ThreadedGzipReader(binary_file, block_size=block_size)) else: - # Deflating random data results in an output a little larger than the - # input. Making the output buffer 10% larger is sufficient overkill. - compress_buffer_size = block_size + max( - block_size // 10, 500) gzip_file = io.BufferedWriter( _ThreadedGzipWriter( fp=binary_file, - buffer_size=compress_buffer_size, + block_size=block_size, level=compresslevel, threads=threads ), @@ -201,15 +197,19 @@ def __init__(self, level: int = isal_zlib.ISAL_DEFAULT_COMPRESSION, threads: int = 1, queue_size: int = 1, - buffer_size: int = 1024 * 1024, + block_size: int = 1024 * 1024, ): self.lock = threading.Lock() self.exception: Optional[Exception] = None self.raw = fp self.level = level self.previous_block = b"" + # Deflating random data results in an output a little larger than the + # input. Making the output buffer 10% larger is sufficient overkill. + compress_buffer_size = block_size + max(block_size // 10, 500) + self.block_size = block_size self.compressors: List[isal_zlib._ParallelCompress] = [ - isal_zlib._ParallelCompress(buffersize=buffer_size, + isal_zlib._ParallelCompress(buffersize=compress_buffer_size, level=level) for _ in range(threads) ] if threads > 1: @@ -273,8 +273,19 @@ def write(self, b) -> int: with self.lock: if self.exception: raise self.exception - index = self.index + length = b.nbytes if isinstance(b, memoryview) else len(b) + if length > self.block_size: + # write smaller chunks and return the result + memview = memoryview(b) + start = 0 + total_written = 0 + while start < length: + total_written += self.write( + memview[start:start+self.block_size]) + start += self.block_size + return total_written data = bytes(b) + index = self.index zdict = memoryview(self.previous_block)[-DEFLATE_WINDOW_SIZE:] self.previous_block = data self.index += 1 diff --git a/tests/test_igzip_threaded.py b/tests/test_igzip_threaded.py index e85d1101..d1ed7315 100644 --- a/tests/test_igzip_threaded.py +++ b/tests/test_igzip_threaded.py @@ -78,11 +78,19 @@ def test_threaded_read_error(): @pytest.mark.timeout(5) @pytest.mark.parametrize("threads", [1, 3]) def test_threaded_write_oversized_block_no_error(threads): - with igzip_threaded.open( - io.BytesIO(), "wb", compresslevel=3, threads=threads, - block_size=8 * 1024 - ) as writer: - writer.write(os.urandom(1024 * 64)) + # Random bytes are incompressible, and therefore are guaranteed to + # trigger a buffer overflow when larger than block size unless handled + # correctly. + data = os.urandom(1024 * 63) # not a multiple of block_size + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as tmp: + with igzip_threaded.open( + tmp, "wb", compresslevel=3, threads=threads, + block_size=8 * 1024 + ) as writer: + writer.write(data) + with gzip.open(tmp.name, "rb") as gzipped: + decompressed = gzipped.read() + assert data == decompressed @pytest.mark.timeout(5) @@ -90,8 +98,9 @@ def test_threaded_write_oversized_block_no_error(threads): def test_threaded_write_error(threads): f = igzip_threaded._ThreadedGzipWriter( fp=io.BytesIO(), level=3, - threads=threads, buffer_size=8 * 1024) - # Bypass the write method which should not allow this. + threads=threads, block_size=8 * 1024) + # Bypass the write method which should not allow blocks larger than + # block_size. f.input_queues[0].put((os.urandom(1024 * 64), b"")) with pytest.raises(OverflowError) as error: f.close() From d42f0db68cb0867dc3c342780da5e8fed962496d Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Thu, 12 Oct 2023 06:59:42 +0200 Subject: [PATCH 21/24] Alter block size in test so multiple queues get tested --- tests/test_igzip_threaded.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/test_igzip_threaded.py b/tests/test_igzip_threaded.py index d1ed7315..7cd602c8 100644 --- a/tests/test_igzip_threaded.py +++ b/tests/test_igzip_threaded.py @@ -28,10 +28,12 @@ def test_threaded_read(): @pytest.mark.parametrize(["mode", "threads"], - itertools.product(["wb", "wt"], [1, 3])) + itertools.product(["wb", "wt"], [1, 3, 12])) def test_threaded_write(mode, threads): with tempfile.NamedTemporaryFile("wb", delete=False) as tmp: - with igzip_threaded.open(tmp, mode, threads=threads) as out_file: + # Use a small block size to simulate many writes. + with igzip_threaded.open(tmp, mode, threads=threads, + block_size=8*1024) as out_file: gzip_open_mode = "rb" if "b" in mode else "rt" with gzip.open(TEST_FILE, gzip_open_mode) as in_file: while True: From dc7f6dd23d7c465b6b2d721b57e20c11900940c7 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Thu, 12 Oct 2023 07:13:04 +0200 Subject: [PATCH 22/24] Make sure negative threads are tested --- tests/test_igzip_threaded.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_igzip_threaded.py b/tests/test_igzip_threaded.py index 7cd602c8..a0f581c6 100644 --- a/tests/test_igzip_threaded.py +++ b/tests/test_igzip_threaded.py @@ -28,7 +28,7 @@ def test_threaded_read(): @pytest.mark.parametrize(["mode", "threads"], - itertools.product(["wb", "wt"], [1, 3, 12])) + itertools.product(["wb", "wt"], [1, 3, -1])) def test_threaded_write(mode, threads): with tempfile.NamedTemporaryFile("wb", delete=False) as tmp: # Use a small block size to simulate many writes. From 4abcbdd21954e299af10da5dbd180fc0955bc6e7 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 11 Oct 2023 19:25:25 +0200 Subject: [PATCH 23/24] Reword changelog to be more user oriented --- CHANGELOG.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index fac4ed78..3ae08f74 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -12,8 +12,8 @@ version 1.5.0-dev + Make a special case for threads==1 in ``igzip_threaded.open`` for writing files. This now combines the writing and compression thread for less overhead. -+ Write a specialized function for compressing blocks in a threaded fashion. - This function maximizes time spent outside the GIL. ++ Maximize time spent outside the GIL for ``igzip_threaded.open`` writing. + This has decreased wallclock time significantly. version 1.4.1 ----------------- From 74883cf40ba479d2f2473acb0e43d50b7f59b51f Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Thu, 12 Oct 2023 07:25:14 +0200 Subject: [PATCH 24/24] Set version 1.5.0 --- CHANGELOG.rst | 2 +- setup.py | 2 +- src/isal/__init__.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 3ae08f74..62dbd1d7 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,7 +7,7 @@ Changelog .. This document is user facing. Please word the changes in such a way .. that users understand how the changes affect the new version. -version 1.5.0-dev +version 1.5.0 ----------------- + Make a special case for threads==1 in ``igzip_threaded.open`` for writing files. This now combines the writing and compression thread for less diff --git a/setup.py b/setup.py index 7aebac23..cd3cfcb3 100644 --- a/setup.py +++ b/setup.py @@ -135,7 +135,7 @@ def build_isa_l(): setup( name="isal", - version="1.5.0-dev", + version="1.5.0", description="Faster zlib and gzip compatible compression and " "decompression by providing python bindings for the ISA-L " "library.", diff --git a/src/isal/__init__.py b/src/isal/__init__.py index 64ba0dbb..95397094 100644 --- a/src/isal/__init__.py +++ b/src/isal/__init__.py @@ -27,4 +27,4 @@ "__version__" ] -__version__ = "1.5.0-dev" +__version__ = "1.5.0"