diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 4ac880b..b2b8f54 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -9,6 +9,9 @@ Changelog version 0.4.0-dev ----------------- ++ Add a ``gzip_ng_threaded`` module that contains the ``gzip_ng_threaded.open`` + function. This allows using multithreaded compression as well as escaping the + GIL. + The internal ``gzip_ng._GzipReader`` has been rewritten in C. As a result the overhead of decompressing files has significantly been reduced. + The ``gzip_ng._GzipReader`` in C is now used in ``gzip_ng.decompress``. The diff --git a/README.rst b/README.rst index eef79dc..e232dcf 100644 --- a/README.rst +++ b/README.rst @@ -42,7 +42,7 @@ by providing Python bindings for the zlib-ng library. This package provides Python bindings for the `zlib-ng `_ library. -``python-zlib-ng`` provides the bindings by offering two modules: +``python-zlib-ng`` provides the bindings by offering three modules: + ``zlib_ng``: A drop-in replacement for the zlib module that uses zlib-ng to accelerate its performance. @@ -51,6 +51,11 @@ This package provides Python bindings for the `zlib-ng instead of ``zlib`` to perform its compression and checksum tasks, which improves performance. ++ ``gzip_ng_threaded`` offers an ``open`` function which returns buffered read + or write streams that can be used to read and write large files while + escaping the GIL using one or multiple threads. This functionality only + works for streaming, seeking is not supported. + ``zlib_ng`` and ``gzip_ng`` are almost fully compatible with ``zlib`` and ``gzip`` from the Python standard library. There are some minor differences see: differences-with-zlib-and-gzip-modules_. @@ -68,6 +73,7 @@ The python-zlib-ng modules can be imported as follows from zlib_ng import zlib_ng from zlib_ng import gzip_ng + from zlib_ng import gzip_ng_threaded ``zlib_ng`` and ``gzip_ng`` are meant to be used as drop in replacements so their api and functions are the same as the stdlib's modules. diff --git a/docs/index.rst b/docs/index.rst index fbc8261..8892cbd 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -113,6 +113,13 @@ API-documentation: zlib_ng.gzip_ng :members: :special-members: __init__ +=========================================== +API-documentation: zlib_ng.gzip_ng_threaded +=========================================== + +.. automodule:: zlib_ng.gzip_ng_threaded + :members: open + =============================== python -m zlib_ng.gzip_ng usage =============================== diff --git a/src/zlib_ng/gzip_ng_threaded.py b/src/zlib_ng/gzip_ng_threaded.py new file mode 100644 index 0000000..f8c0820 --- /dev/null +++ b/src/zlib_ng/gzip_ng_threaded.py @@ -0,0 +1,430 @@ +# Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, +# 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022 +# Python Software Foundation; All Rights Reserved + +# This file is part of python-zlib-ng which is distributed under the +# PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2. + +import builtins +import io +import multiprocessing +import os +import queue +import struct +import threading +from typing import List, Optional, Tuple + +from . import gzip_ng, zlib_ng + +DEFLATE_WINDOW_SIZE = 2 ** 15 + + +def open(filename, mode="rb", compresslevel=gzip_ng._COMPRESS_LEVEL_TRADEOFF, + encoding=None, errors=None, newline=None, *, threads=1, + block_size=1024 * 1024): + """ + Utilize threads to read and write gzip objects and escape the GIL. + Comparable to gzip.open. This method is only usable for streamed reading + and writing of objects. Seeking is not supported. + + threads == 0 will defer to gzip_ng.open. A threads < 0 will attempt to use + the number of threads in the system. + + :param filename: str, bytes or file-like object (supporting read or write + method) + :param mode: the mode with which the file should be opened. + :param compresslevel: Compression level, only used for gzip writers. + :param encoding: Passed through to the io.TextIOWrapper, if applicable. + :param errors: Passed through to the io.TextIOWrapper, if applicable. + :param newline: Passed through to the io.TextIOWrapper, if applicable. + :param threads: If 0 will defer to gzip_ng.open, if < 0 will use all threads + available to the system. Reading gzip can only + use one thread. + :param block_size: Determines how large the blocks in the read/write + queues are for threaded reading and writing. + :return: An io.BufferedReader, io.BufferedWriter, or io.TextIOWrapper, + depending on the mode. + """ + if threads == 0: + return gzip_ng.open(filename, mode, compresslevel, encoding, errors, + newline) + elif threads < 0: + try: + threads = len(os.sched_getaffinity(0)) + except: # noqa: E722 + try: + threads = multiprocessing.cpu_count() + except: # noqa: E722 + threads = 1 + if "r" in mode: + gzip_file = io.BufferedReader( + _ThreadedGzipReader(filename, block_size=block_size)) + else: + gzip_file = io.BufferedWriter( + _ThreadedGzipWriter( + filename, + mode.replace("t", "b"), + block_size=block_size, + level=compresslevel, + threads=threads + ), + buffer_size=block_size + ) + if "t" in mode: + return io.TextIOWrapper(gzip_file, encoding, errors, newline) + return gzip_file + + +def open_as_binary_stream(filename, open_mode): + if isinstance(filename, (str, bytes)) or hasattr(filename, "__fspath__"): + binary_file = builtins.open(filename, open_mode) + elif hasattr(filename, "read") or hasattr(filename, "write"): + binary_file = filename + else: + raise TypeError("filename must be a str or bytes object, or a file") + return binary_file + + +class _ThreadedGzipReader(io.RawIOBase): + def __init__(self, filename, queue_size=2, block_size=1024 * 1024): + self.raw = open_as_binary_stream(filename, "rb") + self.fileobj = zlib_ng._GzipReader(self.raw, buffersize=8 * block_size) + self.pos = 0 + self.read_file = False + self.queue = queue.Queue(queue_size) + self.eof = False + self.exception = None + self.buffer = io.BytesIO() + self.block_size = block_size + self.worker = threading.Thread(target=self._decompress) + self._closed = False + self.running = True + self.worker.start() + + def _check_closed(self, msg=None): + if self._closed: + raise ValueError("I/O operation on closed file") + + def _decompress(self): + block_size = self.block_size + block_queue = self.queue + while self.running: + try: + data = self.fileobj.read(block_size) + except Exception as e: + self.exception = e + return + if not data: + return + while self.running: + try: + block_queue.put(data, timeout=0.05) + break + except queue.Full: + pass + + def readinto(self, b): + self._check_closed() + result = self.buffer.readinto(b) + if result == 0: + while True: + try: + data_from_queue = self.queue.get(timeout=0.01) + break + except queue.Empty: + if not self.worker.is_alive(): + if self.exception: + raise self.exception + # EOF reached + return 0 + self.buffer = io.BytesIO(data_from_queue) + result = self.buffer.readinto(b) + self.pos += result + return result + + def readable(self) -> bool: + return True + + def tell(self) -> int: + self._check_closed() + return self.pos + + def close(self) -> None: + if self._closed: + return + self.running = False + self.worker.join() + self.fileobj.close() + self.raw.close() + self._closed = True + + @property + def closed(self) -> bool: + return self._closed + + +class _ThreadedGzipWriter(io.RawIOBase): + """ + Write a gzip file using multiple threads. + + This class is heavily inspired by pigz from Mark Adler + (https://github.com/madler/pigz). It works similarly. + + Each thread gets its own input and output queue. The program performs a + round robin using an index. The writer thread reads from the output + queues in a round robin using an index. This way all the blocks will be + written to the output stream in order while still allowing independent + compression for each thread. + + Writing to the ThreadedGzipWriter happens on the main thread in a + io.BufferedWriter. The BufferedWriter will offer a memoryview of its + buffer. Using the bytes constructor this is made into an immutable block of + data. + + A reference to the previous block is used to create a memoryview of the + last 32k of that block. This is used as a dictionary for the compression + allowing for better compression rates. + + The current block and the dictionary are pushed into an input queue. They + are picked up by a compression worker that calculates the crc32, the + length of the data and compresses the block. The compressed block, checksum + and length are pushed into an output queue. + + The writer thread reads from output queues and uses the crc32_combine + function to calculate the total crc. It also writes the compressed block. + + When only one thread is requested, only the input queue is used and + compressing and output is handled in one thread. + """ + def __init__(self, + filename, + mode: str = "wb", + level: int = zlib_ng.Z_DEFAULT_COMPRESSION, + threads: int = 1, + queue_size: int = 1, + block_size: int = 1024 * 1024, + ): + # File should be closed during init, so __exit__ method does not + # touch the self.raw value before it is initialized. + self._closed = True + if "t" in mode or "r" in mode: + raise ValueError("Only binary writing is supported") + if "b" not in mode: + mode += "b" + self.lock = threading.Lock() + self.exception: Optional[Exception] = None + self.level = level + self.previous_block = b"" + # Deflating random data results in an output a little larger than the + # input. Making the output buffer 10% larger is sufficient overkill. + compress_buffer_size = block_size + max(block_size // 10, 500) + self.block_size = block_size + self.compressors: List[zlib_ng._ParallelCompress] = [ + zlib_ng._ParallelCompress(buffersize=compress_buffer_size, + level=level) for _ in range(threads) + ] + if threads > 1: + self.input_queues: List[queue.Queue[Tuple[bytes, memoryview]]] = [ + queue.Queue(queue_size) for _ in range(threads)] + self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [ + queue.Queue(queue_size) for _ in range(threads)] + self.output_worker = threading.Thread(target=self._write) + self.compression_workers = [ + threading.Thread(target=self._compress, args=(i,)) + for i in range(threads) + ] + elif threads == 1: + self.input_queues = [queue.Queue(queue_size)] + self.output_queues = [] + self.compression_workers = [] + self.output_worker = threading.Thread( + target=self._compress_and_write) + else: + raise ValueError(f"threads should be at least 1, got {threads}") + self.threads = threads + self.index = 0 + self._crc = 0 + self.running = False + self._size = 0 + self.raw = open_as_binary_stream(filename, mode) + self._closed = False + self._write_gzip_header() + self.start() + + def _check_closed(self, msg=None): + if self._closed: + raise ValueError("I/O operation on closed file") + + def _write_gzip_header(self): + """Simple gzip header. Only xfl flag is set according to level.""" + magic1 = 0x1f + magic2 = 0x8b + method = 0x08 + flags = 0 + mtime = 0 + os = 0xff + if self.level == zlib_ng.Z_BEST_COMPRESSION: + xfl = 2 + elif self.level == zlib_ng.Z_BEST_SPEED: + xfl = 4 + else: + xfl = 0 + self.raw.write(struct.pack( + "BBBBIBB", magic1, magic2, method, flags, mtime, os, xfl)) + + def start(self): + self.running = True + self.output_worker.start() + for worker in self.compression_workers: + worker.start() + + def stop(self): + """Stop, but do not care for remaining work""" + self.running = False + for worker in self.compression_workers: + worker.join() + self.output_worker.join() + + def write(self, b) -> int: + self._check_closed() + with self.lock: + if self.exception: + raise self.exception + length = b.nbytes if isinstance(b, memoryview) else len(b) + if length > self.block_size: + # write smaller chunks and return the result + memview = memoryview(b) + start = 0 + total_written = 0 + while start < length: + total_written += self.write( + memview[start:start+self.block_size]) + start += self.block_size + return total_written + data = bytes(b) + index = self.index + zdict = memoryview(self.previous_block)[-DEFLATE_WINDOW_SIZE:] + self.previous_block = data + self.index += 1 + worker_index = index % self.threads + self.input_queues[worker_index].put((data, zdict)) + return len(data) + + def flush(self): + self._check_closed() + # Wait for all data to be compressed + for in_q in self.input_queues: + in_q.join() + # Wait for all data to be written + for out_q in self.output_queues: + out_q.join() + self.raw.flush() + + def close(self) -> None: + if self._closed: + return + self.flush() + self.stop() + if self.exception: + self.raw.close() + self._closed = True + raise self.exception + # Write an empty deflate block with a lost block marker. + self.raw.write(zlib_ng.compress(b"", wbits=-15)) + trailer = struct.pack(" bool: + return self._closed + + def _compress(self, index: int): + in_queue = self.input_queues[index] + out_queue = self.output_queues[index] + compressor: zlib_ng._ParallelCompress = self.compressors[index] + while True: + try: + data, zdict = in_queue.get(timeout=0.05) + except queue.Empty: + if not self.running: + return + continue + try: + compressed, crc = compressor.compress_and_crc(data, zdict) + except Exception as e: + in_queue.task_done() + self._set_error_and_empty_queue(e, in_queue) + return + data_length = len(data) + out_queue.put((compressed, crc, data_length)) + in_queue.task_done() + + def _write(self): + index = 0 + output_queues = self.output_queues + fp = self.raw + total_crc = 0 + size = 0 + while True: + out_index = index % self.threads + output_queue = output_queues[out_index] + try: + compressed, crc, data_length = output_queue.get(timeout=0.05) + except queue.Empty: + if not self.running: + self._crc = total_crc + self._size = size + return + continue + total_crc = zlib_ng.crc32_combine(total_crc, crc, data_length) + size += data_length + fp.write(compressed) + output_queue.task_done() + index += 1 + + def _compress_and_write(self): + if not self.threads == 1: + raise SystemError("Compress_and_write is for one thread only") + fp = self.raw + total_crc = 0 + size = 0 + in_queue = self.input_queues[0] + compressor = self.compressors[0] + while True: + try: + data, zdict = in_queue.get(timeout=0.05) + except queue.Empty: + if not self.running: + self._crc = total_crc + self._size = size + return + continue + try: + compressed, crc = compressor.compress_and_crc(data, zdict) + except Exception as e: + in_queue.task_done() + self._set_error_and_empty_queue(e, in_queue) + return + data_length = len(data) + total_crc = zlib_ng.crc32_combine(total_crc, crc, data_length) + size += data_length + fp.write(compressed) + in_queue.task_done() + + def _set_error_and_empty_queue(self, error, q): + with self.lock: + self.exception = error + # Abort everything and empty the queue + self.running = False + while True: + try: + _ = q.get(timeout=0.05) + q.task_done() + except queue.Empty: + return + + def writable(self) -> bool: + return True diff --git a/src/zlib_ng/zlib_ng.pyi b/src/zlib_ng/zlib_ng.pyi index a11cae7..775340c 100644 --- a/src/zlib_ng/zlib_ng.pyi +++ b/src/zlib_ng/zlib_ng.pyi @@ -40,6 +40,7 @@ error: Exception def adler32(__data, __value: int = 1) -> int: ... def crc32(__data, __value: int = 0) -> int: ... +def crc32_combine(__crc1: int, __crc2: int, __crc2_length: int) -> int: ... def compress(__data, level: int = Z_DEFAULT_COMPRESSION, @@ -69,6 +70,10 @@ def compressobj(level: int = Z_DEFAULT_COMPRESSION, def decompressobj(wbits: int = MAX_WBITS, zdict = None) -> _Decompress: ... +class _ParallelCompress: + def __init__(self, buffersize: int, level: int): ... + def compress_and_crc(self, __data, __zdict) -> typing.Tuple[bytes, int]: ... + class _ZlibDecompressor: unused_data: bytes needs_input: bool diff --git a/src/zlib_ng/zlib_ngmodule.c b/src/zlib_ng/zlib_ngmodule.c index b3cb943..d0e528a 100644 --- a/src/zlib_ng/zlib_ngmodule.c +++ b/src/zlib_ng/zlib_ngmodule.c @@ -1561,6 +1561,241 @@ zlib_crc32(PyObject *module, PyObject *const *args, Py_ssize_t nargs) return return_value; } + +PyDoc_STRVAR(zlib_crc32_combine__doc__, +"crc32_combine($module, crc1, crc2, crc2_length /)\n" +"--\n" +"\n" +"Combine crc1 and crc2 into a new crc that is accurate for the combined data \n" +"blocks that crc1 and crc2 where calculated from.\n" +"\n" +" crc1\n" +" the first crc32 checksum\n" +" crc2\n" +" the second crc32 checksum\n" +" crc2_length\n" +" the lenght of the data block crc2 was calculated from\n" +); + + +#define ZLIB_CRC32_COMBINE_METHODDEF \ + {"crc32_combine", (PyCFunction)(void(*)(void))zlib_crc32_combine, \ + METH_VARARGS, zlib_crc32_combine__doc__} + +static PyObject * +zlib_crc32_combine(PyObject *module, PyObject *args) { + uint32_t crc1 = 0; + uint32_t crc2 = 0; + Py_ssize_t crc2_length = 0; + static char *format = "IIn:crc32_combine"; + if (PyArg_ParseTuple(args, format, &crc1, &crc2, &crc2_length) < 0) { + return NULL; + } + return PyLong_FromUnsignedLong( + zng_crc32_combine(crc1, crc2, crc2_length) & 0xFFFFFFFF); +} + +typedef struct { + PyObject_HEAD + uint8_t *buffer; + uint32_t buffer_size; + zng_stream zst; + uint8_t is_initialised; +} ParallelCompress; + +static void +ParallelCompress_dealloc(ParallelCompress *self) +{ + PyMem_Free(self->buffer); + if (self->is_initialised) { + zng_deflateEnd(&self->zst); + } + Py_TYPE(self)->tp_free((PyObject *)self); +} + +static PyObject * +ParallelCompress__new__(PyTypeObject *type, PyObject *args, PyObject *kwargs) +{ + Py_ssize_t buffer_size = 0; + int level = Z_DEFAULT_COMPRESSION; + static char *format = "n|i:ParallelCompress__new__"; + static char *kwarg_names[] = {"buffersize", "level", NULL}; + if (PyArg_ParseTupleAndKeywords(args, kwargs, format, kwarg_names, + &buffer_size, &level) < 0) { + return NULL; + } + if (buffer_size > UINT32_MAX) { + PyErr_Format(PyExc_ValueError, + "buffersize must be at most %zd, got %zd", + (Py_ssize_t)UINT32_MAX, buffer_size); + } + ParallelCompress *self = PyObject_New(ParallelCompress, type); + if (self == NULL) { + return PyErr_NoMemory(); + } + self->buffer = NULL; + self->zst.next_in = NULL; + self->zst.avail_in = 0; + self->zst.next_out = NULL; + self->zst.avail_out = 0; + self->zst.opaque = NULL; + self->zst.zalloc = PyZlib_Malloc; + self->zst.zfree = PyZlib_Free; + self->is_initialised = 0; + int err = zng_deflateInit2(&self->zst, level, DEFLATED, -MAX_WBITS, DEF_MEM_LEVEL, + Z_DEFAULT_STRATEGY); + switch (err) { + case Z_OK: + break; + case Z_MEM_ERROR: + PyErr_SetString(PyExc_MemoryError, + "Out of memory while compressing data"); + Py_DECREF(self); + return NULL; + case Z_STREAM_ERROR: + PyErr_SetString(ZlibError, "Bad compression level"); + Py_DECREF(self); + return NULL; + default: + zng_deflateEnd(&self->zst); + zlib_error(self->zst, err, "while compressing data"); + Py_DECREF(self); + return NULL; + } + self->is_initialised = 1; + uint8_t *buffer = PyMem_Malloc(buffer_size); + if (buffer == NULL) { + Py_DECREF(self); + return PyErr_NoMemory(); + } + self->buffer = buffer; + self->buffer_size = buffer_size; + return (PyObject *)self; +} + + +PyDoc_STRVAR(ParallelCompress_compress_and_crc__doc__, +"compress_and_crc($self, data, zdict, /)\n" +"--\n" +"\n" +"Function specifically designed for use in parallel compression. Data is \n" +"compressed using deflate and Z_SYNC_FLUSH is used to ensure the block aligns\n" +"to a byte boundary. Also the CRC is calculated. This function is designed to \n" +"maximize the time spent outside the GIL\n" +"\n" +" data\n" +" bytes-like object containing the to be compressed data\n" +" zdict\n" +" last 32 bytes of the previous block\n" +); +#define PARALLELCOMPRESS_COMPRESS_AND_CRC_METHODDEF \ + { \ + "compress_and_crc", (PyCFunction)ParallelCompress_compress_and_crc, \ + METH_FASTCALL, ParallelCompress_compress_and_crc__doc__} + +static PyObject * +ParallelCompress_compress_and_crc(ParallelCompress *self, + PyObject *const *args, + Py_ssize_t nargs) +{ + if (nargs != 2) { + PyErr_Format( + PyExc_TypeError, + "compress_and_crc takes exactly 2 arguments, got %zd", + nargs); + return NULL; + } + Py_buffer data; + Py_buffer zdict; + if (PyObject_GetBuffer(args[0], &data, PyBUF_SIMPLE) == -1) { + return NULL; + } + if (PyObject_GetBuffer(args[1], &zdict, PyBUF_SIMPLE) == -1) { + PyBuffer_Release(&data); + return NULL; + } + + if (data.len + zdict.len > UINT32_MAX) { + PyErr_Format(PyExc_OverflowError, + "Can only compress %d bytes of data", UINT32_MAX); + goto error; + } + PyThreadState *_save; + Py_UNBLOCK_THREADS + int err = zng_deflateReset(&self->zst); + if (err != Z_OK) { + Py_BLOCK_THREADS; + zlib_error(self->zst, err, "error resetting deflate state"); + goto error; + } + self->zst.avail_in = data.len; + self->zst.next_in = data.buf; + self->zst.next_out = self->buffer; + self->zst.avail_out = self->buffer_size; + err = zng_deflateSetDictionary(&self->zst, zdict.buf, zdict.len); + if (err != Z_OK){ + Py_BLOCK_THREADS; + zlib_error(self->zst, err, "error setting dictionary"); + goto error; + } + uint32_t crc = zng_crc32_z(0, data.buf, data.len); + err = zng_deflate(&self->zst, Z_SYNC_FLUSH); + Py_BLOCK_THREADS; + + if (err != Z_OK) { + zlib_error(self->zst, err, "error setting dictionary"); + goto error; + } + if (self->zst.avail_out == 0) { + PyErr_Format( + PyExc_OverflowError, + "Compressed output exceeds buffer size of %u", self->buffer_size + ); + goto error; + } + if (self->zst.avail_in != 0) { + PyErr_Format( + PyExc_RuntimeError, + "Developer error input bytes are still available: %u. " + "Please contact the developers by creating an issue at " + "https://github.com/pycompression/python-isal/issues", + self->zst.avail_in); + goto error; + } + PyObject *out_tup = PyTuple_New(2); + PyObject *crc_obj = PyLong_FromUnsignedLong(crc); + PyObject *out_bytes = PyBytes_FromStringAndSize( + (char *)self->buffer, self->zst.next_out - self->buffer); + if (out_bytes == NULL || out_tup == NULL || crc_obj == NULL) { + Py_XDECREF(out_bytes); Py_XDECREF(out_tup); Py_XDECREF(crc_obj); + goto error; + } + PyBuffer_Release(&data); + PyBuffer_Release(&zdict); + PyTuple_SET_ITEM(out_tup, 0, out_bytes); + PyTuple_SET_ITEM(out_tup, 1, crc_obj); + return out_tup; +error: + PyBuffer_Release(&data); + PyBuffer_Release(&zdict); + return NULL; +} + +static PyMethodDef ParallelCompress_methods[] = { + PARALLELCOMPRESS_COMPRESS_AND_CRC_METHODDEF, + {NULL}, +}; + +static PyTypeObject ParallelCompress_Type = { + .tp_name = "isal_zlib._ParallelCompress", + .tp_basicsize = sizeof(ParallelCompress), + .tp_doc = PyDoc_STR( + "A reusable zstream and buffer fast parallel compression."), + .tp_dealloc = (destructor)ParallelCompress_dealloc, + .tp_new = ParallelCompress__new__, + .tp_methods = ParallelCompress_methods, +}; + PyDoc_STRVAR(zlib_compress__doc__, "compress($module, data, /, level=Z_DEFAULT_COMPRESSION, wbits=MAX_WBITS)\n" "--\n" @@ -2700,6 +2935,7 @@ static PyMethodDef zlib_methods[] = ZLIB_COMPRESS_METHODDEF, ZLIB_COMPRESSOBJ_METHODDEF, ZLIB_CRC32_METHODDEF, + ZLIB_CRC32_COMBINE_METHODDEF, ZLIB_DECOMPRESS_METHODDEF, ZLIB_DECOMPRESSOBJ_METHODDEF, {NULL, NULL} @@ -2763,6 +2999,15 @@ PyInit_zlib_ng(void) return NULL; } + if (PyType_Ready(&ParallelCompress_Type) != 0) { + return NULL; + } + Py_INCREF(&ParallelCompress_Type); + if (PyModule_AddObject(m, "_ParallelCompress", + (PyObject *)&ParallelCompress_Type) < 0) { + return NULL; + } + ZlibError = PyErr_NewException("zlib_ng.error", NULL, NULL); if (ZlibError == NULL) { return NULL; diff --git a/tests/test_gzip_ng_threaded.py b/tests/test_gzip_ng_threaded.py new file mode 100644 index 0000000..871e3ae --- /dev/null +++ b/tests/test_gzip_ng_threaded.py @@ -0,0 +1,192 @@ +# Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, +# 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022 +# Python Software Foundation; All Rights Reserved + +# This file is part of python-isal which is distributed under the +# PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2. + +import gzip +import io +import itertools +import os +import tempfile +from pathlib import Path + +import pytest + +from zlib_ng import gzip_ng_threaded, zlib_ng + +TEST_FILE = str((Path(__file__).parent / "data" / "test.fastq.gz")) + + +def test_threaded_read(): + with gzip_ng_threaded.open(TEST_FILE, "rb") as thread_f: + thread_data = thread_f.read() + with gzip.open(TEST_FILE, "rb") as f: + data = f.read() + assert thread_data == data + + +@pytest.mark.parametrize(["mode", "threads"], + itertools.product(["wb", "wt"], [1, 3, -1])) +def test_threaded_write(mode, threads): + with tempfile.NamedTemporaryFile("wb", delete=False) as tmp: + # Use a small block size to simulate many writes. + with gzip_ng_threaded.open(tmp, mode, threads=threads, + block_size=8*1024) as out_file: + gzip_open_mode = "rb" if "b" in mode else "rt" + with gzip.open(TEST_FILE, gzip_open_mode) as in_file: + while True: + block = in_file.read(128 * 1024) + if not block: + break + out_file.write(block) + with gzip.open(TEST_FILE, "rt") as test_file: + test_data = test_file.read() + with gzip.open(tmp.name, "rt") as test_out: + out_data = test_out.read() + assert test_data == out_data + + +def test_threaded_open_no_threads(): + with tempfile.TemporaryFile("rb") as tmp: + klass = gzip_ng_threaded.open(tmp, "rb", threads=0) + # igzip.IGzipFile inherits gzip.Gzipfile + assert isinstance(klass, gzip.GzipFile) + + +def test_threaded_open_not_a_file_or_pathlike(): + i_am_a_tuple = (1, 2, 3) + with pytest.raises(TypeError) as error: + gzip_ng_threaded.open(i_am_a_tuple) + error.match("str") + error.match("bytes") + error.match("file") + + +# Test whether threaded readers and writers throw an error rather than hang +# indefinitely. + +@pytest.mark.timeout(5) +def test_threaded_read_error(): + with open(TEST_FILE, "rb") as f: + data = f.read() + truncated_data = data[:-8] + with gzip_ng_threaded.open(io.BytesIO(truncated_data), "rb") as tr_f: + with pytest.raises(EOFError): + tr_f.read() + + +@pytest.mark.timeout(5) +@pytest.mark.parametrize("threads", [1, 3]) +def test_threaded_write_oversized_block_no_error(threads): + # Random bytes are incompressible, and therefore are guaranteed to + # trigger a buffer overflow when larger than block size unless handled + # correctly. + data = os.urandom(1024 * 63) # not a multiple of block_size + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as tmp: + with gzip_ng_threaded.open( + tmp, "wb", compresslevel=3, threads=threads, + block_size=8 * 1024 + ) as writer: + writer.write(data) + with gzip.open(tmp.name, "rb") as gzipped: + decompressed = gzipped.read() + assert data == decompressed + + +@pytest.mark.timeout(5) +@pytest.mark.parametrize("threads", [1, 3]) +def test_threaded_write_error(threads): + f = gzip_ng_threaded._ThreadedGzipWriter( + io.BytesIO(), level=3, + threads=threads, block_size=8 * 1024) + # Bypass the write method which should not allow blocks larger than + # block_size. + f.input_queues[0].put((os.urandom(1024 * 64), b"")) + with pytest.raises(OverflowError) as error: + f.close() + error.match("Compressed output exceeds buffer size") + + +def test_close_reader(): + tmp = io.BytesIO(Path(TEST_FILE).read_bytes()) + f = gzip_ng_threaded._ThreadedGzipReader(tmp, "rb") + f.close() + assert f.closed + # Make sure double closing does not raise errors + f.close() + + +@pytest.mark.parametrize("threads", [1, 3]) +def test_close_writer(threads): + f = gzip_ng_threaded._ThreadedGzipWriter( + io.BytesIO(), threads=threads) + f.close() + assert f.closed + # Make sure double closing does not raise errors + f.close() + + +def test_reader_not_writable(): + with gzip_ng_threaded.open(TEST_FILE, "rb") as f: + assert not f.writable() + + +def test_writer_not_readable(): + with gzip_ng_threaded.open(io.BytesIO(), "wb") as f: + assert not f.readable() + + +def test_writer_wrong_level(): + with tempfile.NamedTemporaryFile("wb") as tmp: + with pytest.raises(zlib_ng.error) as error: + gzip_ng_threaded.open(tmp.name, mode="wb", compresslevel=42) + error.match("Bad compression level") + + +def test_writer_too_low_threads(): + with pytest.raises(ValueError) as error: + gzip_ng_threaded._ThreadedGzipWriter(io.BytesIO(), threads=0) + error.match("threads") + error.match("at least 1") + + +def test_reader_read_after_close(): + with open(TEST_FILE, "rb") as test_f: + f = gzip_ng_threaded._ThreadedGzipReader(test_f) + f.close() + with pytest.raises(ValueError) as error: + f.read(1024) + error.match("closed") + + +@pytest.mark.parametrize("threads", [1, 3]) +def test_writer_write_after_close(threads): + f = gzip_ng_threaded._ThreadedGzipWriter(io.BytesIO(), threads=threads) + f.close() + with pytest.raises(ValueError) as error: + f.write(b"abc") + error.match("closed") + + +def test_gzip_ng_threaded_append(tmp_path): + test_file = tmp_path / "test.txt.gz" + with gzip_ng_threaded.open(test_file, "wb") as f: + f.write(b"AB") + with gzip_ng_threaded.open(test_file, mode="ab") as f: + f.write(b"CD") + with gzip.open(test_file, "rb") as f: + contents = f.read() + assert contents == b"ABCD" + + +def test_gzip_ng_threaded_append_text_mode(tmp_path): + test_file = tmp_path / "test.txt.gz" + with gzip_ng_threaded.open(test_file, "wt") as f: + f.write("AB") + with gzip_ng_threaded.open(test_file, mode="at") as f: + f.write("CD") + with gzip.open(test_file, "rt") as f: + contents = f.read() + assert contents == "ABCD" diff --git a/tests/test_zlib_compliance.py b/tests/test_zlib_compliance.py index 1f8ff53..3d5ab65 100644 --- a/tests/test_zlib_compliance.py +++ b/tests/test_zlib_compliance.py @@ -108,6 +108,17 @@ def test_crc32_adler32_unsigned(self): self.assertEqual(zlib.adler32(foo + foo), 3573550353) self.assertEqual(zlib.adler32(b'spam'), 72286642) + def test_crc32_combine(self): + foo = b'abcdefghijklmnop' + self.assertEqual(zlib.crc32_combine(0, 0, 0), 0) + self.assertEqual(zlib.crc32_combine(1, 0, 0), 1) + self.assertEqual(zlib.crc32_combine(432, 0, 0), 432) + self.assertEqual( + zlib.crc32_combine( + zlib.crc32(foo), zlib.crc32(foo), len(foo)), + zlib.crc32(foo + foo) + ) + def test_same_as_binascii_crc32(self): foo = b'abcdefghijklmnop' crc = 2486878355 diff --git a/tox.ini b/tox.ini index ace675b..06b7f91 100644 --- a/tox.ini +++ b/tox.ini @@ -7,6 +7,7 @@ isolated_build=True [testenv] deps=pytest + pytest-timeout coverage passenv= PYTHON_ZLIB_NG_LINK_DYNAMIC