diff --git a/CHANGELOG.rst b/CHANGELOG.rst index a4eaa79..6a38039 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,6 +7,18 @@ Changelog .. This document is user facing. Please word the changes in such a way .. that users understand how the changes affect the new version. +version 0.4.0 +----------------- ++ Add a ``gzip_ng_threaded`` module that contains the ``gzip_ng_threaded.open`` + function. This allows using multithreaded compression as well as escaping the + GIL. ++ The internal ``gzip_ng._GzipReader`` has been rewritten in C. As a result the + overhead of decompressing files has significantly been reduced. ++ The ``gzip_ng._GzipReader`` in C is now used in ``gzip_ng.decompress``. The + ``_GzipReader`` also can read from objects that support the buffer protocol. + This has reduced overhead significantly. ++ Fix some unclosed buffer errors in the gzip_ng CLI. + version 0.3.0 ----------------- + Source distributions on Linux now default to building with configure and diff --git a/README.rst b/README.rst index eef79dc..e232dcf 100644 --- a/README.rst +++ b/README.rst @@ -42,7 +42,7 @@ by providing Python bindings for the zlib-ng library. This package provides Python bindings for the `zlib-ng `_ library. -``python-zlib-ng`` provides the bindings by offering two modules: +``python-zlib-ng`` provides the bindings by offering three modules: + ``zlib_ng``: A drop-in replacement for the zlib module that uses zlib-ng to accelerate its performance. @@ -51,6 +51,11 @@ This package provides Python bindings for the `zlib-ng instead of ``zlib`` to perform its compression and checksum tasks, which improves performance. ++ ``gzip_ng_threaded`` offers an ``open`` function which returns buffered read + or write streams that can be used to read and write large files while + escaping the GIL using one or multiple threads. This functionality only + works for streaming, seeking is not supported. + ``zlib_ng`` and ``gzip_ng`` are almost fully compatible with ``zlib`` and ``gzip`` from the Python standard library. There are some minor differences see: differences-with-zlib-and-gzip-modules_. @@ -68,6 +73,7 @@ The python-zlib-ng modules can be imported as follows from zlib_ng import zlib_ng from zlib_ng import gzip_ng + from zlib_ng import gzip_ng_threaded ``zlib_ng`` and ``gzip_ng`` are meant to be used as drop in replacements so their api and functions are the same as the stdlib's modules. diff --git a/docs/index.rst b/docs/index.rst index fbc8261..8892cbd 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -113,6 +113,13 @@ API-documentation: zlib_ng.gzip_ng :members: :special-members: __init__ +=========================================== +API-documentation: zlib_ng.gzip_ng_threaded +=========================================== + +.. automodule:: zlib_ng.gzip_ng_threaded + :members: open + =============================== python -m zlib_ng.gzip_ng usage =============================== diff --git a/setup.py b/setup.py index e322e1c..fdace0f 100644 --- a/setup.py +++ b/setup.py @@ -123,7 +123,7 @@ def build_zlib_ng(): setup( name="zlib-ng", - version="0.3.0", + version="0.4.0", description="Drop-in replacement for zlib and gzip modules using zlib-ng", author="Leiden University Medical Center", author_email="r.h.p.vorderman@lumc.nl", # A placeholder for now diff --git a/src/zlib_ng/__init__.py b/src/zlib_ng/__init__.py index 89918cc..5d5979e 100644 --- a/src/zlib_ng/__init__.py +++ b/src/zlib_ng/__init__.py @@ -5,4 +5,4 @@ # This file is part of python-zlib-ng which is distributed under the # PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2. -__version__ = "0.3.0" +__version__ = "0.4.0" diff --git a/src/zlib_ng/gzip_ng.py b/src/zlib_ng/gzip_ng.py index 70eede2..33db4be 100644 --- a/src/zlib_ng/gzip_ng.py +++ b/src/zlib_ng/gzip_ng.py @@ -25,9 +25,9 @@ import struct import sys import time -import _compression # noqa: I201 # Not third-party from . import zlib_ng +from .zlib_ng import _GzipReader __all__ = ["GzipFile", "open", "compress", "decompress", "BadGzipFile", "READ_BUFFER_SIZE"] @@ -36,19 +36,14 @@ _COMPRESS_LEVEL_TRADEOFF = zlib_ng.Z_DEFAULT_COMPRESSION _COMPRESS_LEVEL_BEST = zlib_ng.Z_BEST_COMPRESSION -#: The amount of data that is read in at once when decompressing a file. -#: Increasing this value may increase performance. -#: 128K is also the size used by pigz and cat to read files from the -# filesystem. -READ_BUFFER_SIZE = 128 * 1024 +# The amount of data that is read in at once when decompressing a file. +# Increasing this value may increase performance. +READ_BUFFER_SIZE = 512 * 1024 FTEXT, FHCRC, FEXTRA, FNAME, FCOMMENT = 1, 2, 4, 8, 16 READ, WRITE = 1, 2 -try: - BadGzipFile = gzip.BadGzipFile # type: ignore -except AttributeError: # Versions lower than 3.8 do not have BadGzipFile - BadGzipFile = OSError # type: ignore +BadGzipFile = gzip.BadGzipFile # type: ignore # The open method was copied from the CPython source with minor adjustments. @@ -149,7 +144,7 @@ def __init__(self, filename=None, mode=None, zlib_ng.DEF_MEM_LEVEL, 0) if self.mode == READ: - raw = _GzipNGReader(self.fileobj) + raw = _GzipReader(self.fileobj, READ_BUFFER_SIZE) self._buffer = io.BufferedReader(raw) def __repr__(self): @@ -180,124 +175,9 @@ def write(self, data): return length -class _GzipNGReader(gzip._GzipReader): - def __init__(self, fp): - # Call the init method of gzip._GzipReader's parent here. - # It is not very invasive and allows us to override _PaddedFile - _compression.DecompressReader.__init__( - self, gzip._PaddedFile(fp), zlib_ng._ZlibDecompressor, - wbits=-zlib_ng.MAX_WBITS) - # Set flag indicating start of a new member - self._new_member = True - self._last_mtime = None - - def read(self, size=-1): - if size < 0: - return self.readall() - # size=0 is special because decompress(max_length=0) is not supported - if not size: - return b"" - - # For certain input data, a single - # call to decompress() may not return - # any data. In this case, retry until we get some data or reach EOF. - while True: - if self._decompressor.eof: - # Ending case: we've come to the end of a member in the file, - # so finish up this member, and read a new gzip header. - # Check the CRC and file size, and set the flag so we read - # a new member - self._read_eof() - self._new_member = True - self._decompressor = self._decomp_factory( - **self._decomp_args) - - if self._new_member: - # If the _new_member flag is set, we have to - # jump to the next member, if there is one. - self._init_read() - if not self._read_gzip_header(): - self._size = self._pos - return b"" - self._new_member = False - - # Read a chunk of data from the file - if self._decompressor.needs_input: - buf = self._fp.read(READ_BUFFER_SIZE) - uncompress = self._decompressor.decompress(buf, size) - else: - uncompress = self._decompressor.decompress(b"", size) - if self._decompressor.unused_data != b"": - # Prepend the already read bytes to the fileobj so they can - # be seen by _read_eof() and _read_gzip_header() - self._fp.prepend(self._decompressor.unused_data) - - if uncompress != b"": - break - if buf == b"": - raise EOFError("Compressed file ended before the " - "end-of-stream marker was reached") - - self._crc = zlib_ng.crc32(uncompress, self._crc) - self._stream_size += len(uncompress) - self._pos += len(uncompress) - return uncompress - - # Aliases for improved compatibility with CPython gzip module. GzipFile = GzipNGFile -_GzipReader = _GzipNGReader - - -def _read_exact(fp, n): - '''Read exactly *n* bytes from `fp` - This method is required because fp may be unbuffered, - i.e. return short reads. - ''' - data = fp.read(n) - while len(data) < n: - b = fp.read(n - len(data)) - if not b: - raise EOFError("Compressed file ended before the " - "end-of-stream marker was reached") - data += b - return data - - -def _read_gzip_header(fp): - '''Read a gzip header from `fp` and progress to the end of the header. - Returns last mtime if header was present or None otherwise. - ''' - magic = fp.read(2) - if magic == b'': - return None - - if magic != b'\037\213': - raise BadGzipFile('Not a gzipped file (%r)' % magic) - - (method, flag, last_mtime) = struct.unpack(" bool: + return True + + def tell(self) -> int: + self._check_closed() + return self.pos + + def close(self) -> None: + if self._closed: + return + self.running = False + self.worker.join() + self.fileobj.close() + self.raw.close() + self._closed = True + + @property + def closed(self) -> bool: + return self._closed + + +class _ThreadedGzipWriter(io.RawIOBase): + """ + Write a gzip file using multiple threads. + + This class is heavily inspired by pigz from Mark Adler + (https://github.com/madler/pigz). It works similarly. + + Each thread gets its own input and output queue. The program performs a + round robin using an index. The writer thread reads from the output + queues in a round robin using an index. This way all the blocks will be + written to the output stream in order while still allowing independent + compression for each thread. + + Writing to the ThreadedGzipWriter happens on the main thread in a + io.BufferedWriter. The BufferedWriter will offer a memoryview of its + buffer. Using the bytes constructor this is made into an immutable block of + data. + + A reference to the previous block is used to create a memoryview of the + last 32k of that block. This is used as a dictionary for the compression + allowing for better compression rates. + + The current block and the dictionary are pushed into an input queue. They + are picked up by a compression worker that calculates the crc32, the + length of the data and compresses the block. The compressed block, checksum + and length are pushed into an output queue. + + The writer thread reads from output queues and uses the crc32_combine + function to calculate the total crc. It also writes the compressed block. + + When only one thread is requested, only the input queue is used and + compressing and output is handled in one thread. + """ + def __init__(self, + filename, + mode: str = "wb", + level: int = zlib_ng.Z_DEFAULT_COMPRESSION, + threads: int = 1, + queue_size: int = 1, + block_size: int = 1024 * 1024, + ): + # File should be closed during init, so __exit__ method does not + # touch the self.raw value before it is initialized. + self._closed = True + if "t" in mode or "r" in mode: + raise ValueError("Only binary writing is supported") + if "b" not in mode: + mode += "b" + self.lock = threading.Lock() + self.exception: Optional[Exception] = None + self.level = level + self.previous_block = b"" + # Deflating random data results in an output a little larger than the + # input. Making the output buffer 10% larger is sufficient overkill. + compress_buffer_size = block_size + max(block_size // 10, 500) + self.block_size = block_size + self.compressors: List[zlib_ng._ParallelCompress] = [ + zlib_ng._ParallelCompress(buffersize=compress_buffer_size, + level=level) for _ in range(threads) + ] + if threads > 1: + self.input_queues: List[queue.Queue[Tuple[bytes, memoryview]]] = [ + queue.Queue(queue_size) for _ in range(threads)] + self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [ + queue.Queue(queue_size) for _ in range(threads)] + self.output_worker = threading.Thread(target=self._write) + self.compression_workers = [ + threading.Thread(target=self._compress, args=(i,)) + for i in range(threads) + ] + elif threads == 1: + self.input_queues = [queue.Queue(queue_size)] + self.output_queues = [] + self.compression_workers = [] + self.output_worker = threading.Thread( + target=self._compress_and_write) + else: + raise ValueError(f"threads should be at least 1, got {threads}") + self.threads = threads + self.index = 0 + self._crc = 0 + self.running = False + self._size = 0 + self.raw = open_as_binary_stream(filename, mode) + self._closed = False + self._write_gzip_header() + self.start() + + def _check_closed(self, msg=None): + if self._closed: + raise ValueError("I/O operation on closed file") + + def _write_gzip_header(self): + """Simple gzip header. Only xfl flag is set according to level.""" + magic1 = 0x1f + magic2 = 0x8b + method = 0x08 + flags = 0 + mtime = 0 + os = 0xff + if self.level == zlib_ng.Z_BEST_COMPRESSION: + xfl = 2 + elif self.level == zlib_ng.Z_BEST_SPEED: + xfl = 4 + else: + xfl = 0 + self.raw.write(struct.pack( + "BBBBIBB", magic1, magic2, method, flags, mtime, os, xfl)) + + def start(self): + self.running = True + self.output_worker.start() + for worker in self.compression_workers: + worker.start() + + def stop(self): + """Stop, but do not care for remaining work""" + self.running = False + for worker in self.compression_workers: + worker.join() + self.output_worker.join() + + def write(self, b) -> int: + self._check_closed() + with self.lock: + if self.exception: + raise self.exception + length = b.nbytes if isinstance(b, memoryview) else len(b) + if length > self.block_size: + # write smaller chunks and return the result + memview = memoryview(b) + start = 0 + total_written = 0 + while start < length: + total_written += self.write( + memview[start:start+self.block_size]) + start += self.block_size + return total_written + data = bytes(b) + index = self.index + zdict = memoryview(self.previous_block)[-DEFLATE_WINDOW_SIZE:] + self.previous_block = data + self.index += 1 + worker_index = index % self.threads + self.input_queues[worker_index].put((data, zdict)) + return len(data) + + def flush(self): + self._check_closed() + # Wait for all data to be compressed + for in_q in self.input_queues: + in_q.join() + # Wait for all data to be written + for out_q in self.output_queues: + out_q.join() + self.raw.flush() + + def close(self) -> None: + if self._closed: + return + self.flush() + self.stop() + if self.exception: + self.raw.close() + self._closed = True + raise self.exception + # Write an empty deflate block with a lost block marker. + self.raw.write(zlib_ng.compress(b"", wbits=-15)) + trailer = struct.pack(" bool: + return self._closed + + def _compress(self, index: int): + in_queue = self.input_queues[index] + out_queue = self.output_queues[index] + compressor: zlib_ng._ParallelCompress = self.compressors[index] + while True: + try: + data, zdict = in_queue.get(timeout=0.05) + except queue.Empty: + if not self.running: + return + continue + try: + compressed, crc = compressor.compress_and_crc(data, zdict) + except Exception as e: + in_queue.task_done() + self._set_error_and_empty_queue(e, in_queue) + return + data_length = len(data) + out_queue.put((compressed, crc, data_length)) + in_queue.task_done() + + def _write(self): + index = 0 + output_queues = self.output_queues + fp = self.raw + total_crc = 0 + size = 0 + while True: + out_index = index % self.threads + output_queue = output_queues[out_index] + try: + compressed, crc, data_length = output_queue.get(timeout=0.05) + except queue.Empty: + if not self.running: + self._crc = total_crc + self._size = size + return + continue + total_crc = zlib_ng.crc32_combine(total_crc, crc, data_length) + size += data_length + fp.write(compressed) + output_queue.task_done() + index += 1 + + def _compress_and_write(self): + if not self.threads == 1: + raise SystemError("Compress_and_write is for one thread only") + fp = self.raw + total_crc = 0 + size = 0 + in_queue = self.input_queues[0] + compressor = self.compressors[0] + while True: + try: + data, zdict = in_queue.get(timeout=0.05) + except queue.Empty: + if not self.running: + self._crc = total_crc + self._size = size + return + continue + try: + compressed, crc = compressor.compress_and_crc(data, zdict) + except Exception as e: + in_queue.task_done() + self._set_error_and_empty_queue(e, in_queue) + return + data_length = len(data) + total_crc = zlib_ng.crc32_combine(total_crc, crc, data_length) + size += data_length + fp.write(compressed) + in_queue.task_done() + + def _set_error_and_empty_queue(self, error, q): + with self.lock: + self.exception = error + # Abort everything and empty the queue + self.running = False + while True: + try: + _ = q.get(timeout=0.05) + q.task_done() + except queue.Empty: + return + + def writable(self) -> bool: + return True diff --git a/src/zlib_ng/zlib_ng.pyi b/src/zlib_ng/zlib_ng.pyi index f5d7dee..775340c 100644 --- a/src/zlib_ng/zlib_ng.pyi +++ b/src/zlib_ng/zlib_ng.pyi @@ -5,6 +5,8 @@ # This file is part of python-zlib-ng which is distributed under the # PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2. +import typing + MAX_WBITS: int DEFLATED: int DEF_MEM_LEVEL: int @@ -38,6 +40,7 @@ error: Exception def adler32(__data, __value: int = 1) -> int: ... def crc32(__data, __value: int = 0) -> int: ... +def crc32_combine(__crc1: int, __crc2: int, __crc2_length: int) -> int: ... def compress(__data, level: int = Z_DEFAULT_COMPRESSION, @@ -67,6 +70,10 @@ def compressobj(level: int = Z_DEFAULT_COMPRESSION, def decompressobj(wbits: int = MAX_WBITS, zdict = None) -> _Decompress: ... +class _ParallelCompress: + def __init__(self, buffersize: int, level: int): ... + def compress_and_crc(self, __data, __zdict) -> typing.Tuple[bytes, int]: ... + class _ZlibDecompressor: unused_data: bytes needs_input: bool @@ -77,3 +84,16 @@ class _ZlibDecompressor: zdict=None): ... def decompress(self, __data, max_length=-1) -> bytes: ... + +class _GzipReader: + def __init__(self, fp: typing.BinaryIO, buffersize: int = 32 * 1024): ... + def readinto(self, obj) -> int: ... + def readable(self) -> bool: ... + def writable(self) -> bool: ... + def seekable(self) -> bool: ... + def tell(self) -> int: ... + def seek(self, offset: int, whence: int): ... + def close(self): ... + def readall(self) -> bytes: ... + def read(self, __size: int): ... + def flush(self): ... diff --git a/src/zlib_ng/zlib_ngmodule.c b/src/zlib_ng/zlib_ngmodule.c index 5ecedb3..d0e528a 100644 --- a/src/zlib_ng/zlib_ngmodule.c +++ b/src/zlib_ng/zlib_ngmodule.c @@ -13,6 +13,19 @@ #error "At least zlib-ng version 2.0.7 is required" #endif +/* PyPy quirks: no Py_UNREACHABLE and requires PyBUF_READ and PyBUF_WRITE set + in memoryviews that enter a "readinto" call. CPython requires that only + PyBUF_WRITE is set. + (Both implementations are wrong because the state of the READ bit should + not matter.) +*/ +#ifdef PYPY_VERSION +#define Py_UNREACHABLE() Py_FatalError("Reached unreachable state") +#define MEMORYVIEW_READINTO_FLAGS (PyBUF_READ | PyBUF_WRITE) +#else +#define MEMORYVIEW_READINTO_FLAGS PyBUF_WRITE +#endif + #define ENTER_ZLIB(obj) do { \ if (!PyThread_acquire_lock((obj)->lock, 0)) { \ Py_BEGIN_ALLOW_THREADS \ @@ -1548,6 +1561,241 @@ zlib_crc32(PyObject *module, PyObject *const *args, Py_ssize_t nargs) return return_value; } + +PyDoc_STRVAR(zlib_crc32_combine__doc__, +"crc32_combine($module, crc1, crc2, crc2_length /)\n" +"--\n" +"\n" +"Combine crc1 and crc2 into a new crc that is accurate for the combined data \n" +"blocks that crc1 and crc2 where calculated from.\n" +"\n" +" crc1\n" +" the first crc32 checksum\n" +" crc2\n" +" the second crc32 checksum\n" +" crc2_length\n" +" the lenght of the data block crc2 was calculated from\n" +); + + +#define ZLIB_CRC32_COMBINE_METHODDEF \ + {"crc32_combine", (PyCFunction)(void(*)(void))zlib_crc32_combine, \ + METH_VARARGS, zlib_crc32_combine__doc__} + +static PyObject * +zlib_crc32_combine(PyObject *module, PyObject *args) { + uint32_t crc1 = 0; + uint32_t crc2 = 0; + Py_ssize_t crc2_length = 0; + static char *format = "IIn:crc32_combine"; + if (PyArg_ParseTuple(args, format, &crc1, &crc2, &crc2_length) < 0) { + return NULL; + } + return PyLong_FromUnsignedLong( + zng_crc32_combine(crc1, crc2, crc2_length) & 0xFFFFFFFF); +} + +typedef struct { + PyObject_HEAD + uint8_t *buffer; + uint32_t buffer_size; + zng_stream zst; + uint8_t is_initialised; +} ParallelCompress; + +static void +ParallelCompress_dealloc(ParallelCompress *self) +{ + PyMem_Free(self->buffer); + if (self->is_initialised) { + zng_deflateEnd(&self->zst); + } + Py_TYPE(self)->tp_free((PyObject *)self); +} + +static PyObject * +ParallelCompress__new__(PyTypeObject *type, PyObject *args, PyObject *kwargs) +{ + Py_ssize_t buffer_size = 0; + int level = Z_DEFAULT_COMPRESSION; + static char *format = "n|i:ParallelCompress__new__"; + static char *kwarg_names[] = {"buffersize", "level", NULL}; + if (PyArg_ParseTupleAndKeywords(args, kwargs, format, kwarg_names, + &buffer_size, &level) < 0) { + return NULL; + } + if (buffer_size > UINT32_MAX) { + PyErr_Format(PyExc_ValueError, + "buffersize must be at most %zd, got %zd", + (Py_ssize_t)UINT32_MAX, buffer_size); + } + ParallelCompress *self = PyObject_New(ParallelCompress, type); + if (self == NULL) { + return PyErr_NoMemory(); + } + self->buffer = NULL; + self->zst.next_in = NULL; + self->zst.avail_in = 0; + self->zst.next_out = NULL; + self->zst.avail_out = 0; + self->zst.opaque = NULL; + self->zst.zalloc = PyZlib_Malloc; + self->zst.zfree = PyZlib_Free; + self->is_initialised = 0; + int err = zng_deflateInit2(&self->zst, level, DEFLATED, -MAX_WBITS, DEF_MEM_LEVEL, + Z_DEFAULT_STRATEGY); + switch (err) { + case Z_OK: + break; + case Z_MEM_ERROR: + PyErr_SetString(PyExc_MemoryError, + "Out of memory while compressing data"); + Py_DECREF(self); + return NULL; + case Z_STREAM_ERROR: + PyErr_SetString(ZlibError, "Bad compression level"); + Py_DECREF(self); + return NULL; + default: + zng_deflateEnd(&self->zst); + zlib_error(self->zst, err, "while compressing data"); + Py_DECREF(self); + return NULL; + } + self->is_initialised = 1; + uint8_t *buffer = PyMem_Malloc(buffer_size); + if (buffer == NULL) { + Py_DECREF(self); + return PyErr_NoMemory(); + } + self->buffer = buffer; + self->buffer_size = buffer_size; + return (PyObject *)self; +} + + +PyDoc_STRVAR(ParallelCompress_compress_and_crc__doc__, +"compress_and_crc($self, data, zdict, /)\n" +"--\n" +"\n" +"Function specifically designed for use in parallel compression. Data is \n" +"compressed using deflate and Z_SYNC_FLUSH is used to ensure the block aligns\n" +"to a byte boundary. Also the CRC is calculated. This function is designed to \n" +"maximize the time spent outside the GIL\n" +"\n" +" data\n" +" bytes-like object containing the to be compressed data\n" +" zdict\n" +" last 32 bytes of the previous block\n" +); +#define PARALLELCOMPRESS_COMPRESS_AND_CRC_METHODDEF \ + { \ + "compress_and_crc", (PyCFunction)ParallelCompress_compress_and_crc, \ + METH_FASTCALL, ParallelCompress_compress_and_crc__doc__} + +static PyObject * +ParallelCompress_compress_and_crc(ParallelCompress *self, + PyObject *const *args, + Py_ssize_t nargs) +{ + if (nargs != 2) { + PyErr_Format( + PyExc_TypeError, + "compress_and_crc takes exactly 2 arguments, got %zd", + nargs); + return NULL; + } + Py_buffer data; + Py_buffer zdict; + if (PyObject_GetBuffer(args[0], &data, PyBUF_SIMPLE) == -1) { + return NULL; + } + if (PyObject_GetBuffer(args[1], &zdict, PyBUF_SIMPLE) == -1) { + PyBuffer_Release(&data); + return NULL; + } + + if (data.len + zdict.len > UINT32_MAX) { + PyErr_Format(PyExc_OverflowError, + "Can only compress %d bytes of data", UINT32_MAX); + goto error; + } + PyThreadState *_save; + Py_UNBLOCK_THREADS + int err = zng_deflateReset(&self->zst); + if (err != Z_OK) { + Py_BLOCK_THREADS; + zlib_error(self->zst, err, "error resetting deflate state"); + goto error; + } + self->zst.avail_in = data.len; + self->zst.next_in = data.buf; + self->zst.next_out = self->buffer; + self->zst.avail_out = self->buffer_size; + err = zng_deflateSetDictionary(&self->zst, zdict.buf, zdict.len); + if (err != Z_OK){ + Py_BLOCK_THREADS; + zlib_error(self->zst, err, "error setting dictionary"); + goto error; + } + uint32_t crc = zng_crc32_z(0, data.buf, data.len); + err = zng_deflate(&self->zst, Z_SYNC_FLUSH); + Py_BLOCK_THREADS; + + if (err != Z_OK) { + zlib_error(self->zst, err, "error setting dictionary"); + goto error; + } + if (self->zst.avail_out == 0) { + PyErr_Format( + PyExc_OverflowError, + "Compressed output exceeds buffer size of %u", self->buffer_size + ); + goto error; + } + if (self->zst.avail_in != 0) { + PyErr_Format( + PyExc_RuntimeError, + "Developer error input bytes are still available: %u. " + "Please contact the developers by creating an issue at " + "https://github.com/pycompression/python-isal/issues", + self->zst.avail_in); + goto error; + } + PyObject *out_tup = PyTuple_New(2); + PyObject *crc_obj = PyLong_FromUnsignedLong(crc); + PyObject *out_bytes = PyBytes_FromStringAndSize( + (char *)self->buffer, self->zst.next_out - self->buffer); + if (out_bytes == NULL || out_tup == NULL || crc_obj == NULL) { + Py_XDECREF(out_bytes); Py_XDECREF(out_tup); Py_XDECREF(crc_obj); + goto error; + } + PyBuffer_Release(&data); + PyBuffer_Release(&zdict); + PyTuple_SET_ITEM(out_tup, 0, out_bytes); + PyTuple_SET_ITEM(out_tup, 1, crc_obj); + return out_tup; +error: + PyBuffer_Release(&data); + PyBuffer_Release(&zdict); + return NULL; +} + +static PyMethodDef ParallelCompress_methods[] = { + PARALLELCOMPRESS_COMPRESS_AND_CRC_METHODDEF, + {NULL}, +}; + +static PyTypeObject ParallelCompress_Type = { + .tp_name = "isal_zlib._ParallelCompress", + .tp_basicsize = sizeof(ParallelCompress), + .tp_doc = PyDoc_STR( + "A reusable zstream and buffer fast parallel compression."), + .tp_dealloc = (destructor)ParallelCompress_dealloc, + .tp_new = ParallelCompress__new__, + .tp_methods = ParallelCompress_methods, +}; + PyDoc_STRVAR(zlib_compress__doc__, "compress($module, data, /, level=Z_DEFAULT_COMPRESSION, wbits=MAX_WBITS)\n" "--\n" @@ -1959,12 +2207,735 @@ static PyTypeObject ZlibDecompressorType = { }; +#define GzipReader_HEADER 1 +#define GzipReader_DEFLATE_BLOCK 2 +#define GzipReader_TRAILER 3 +#define GzipReader_NULL_BYTES 4 + +typedef struct _GzipReaderStruct { + PyObject_HEAD + uint8_t *input_buffer; + size_t buffer_size; + const uint8_t *current_pos; + const uint8_t *buffer_end; + int64_t _pos; + int64_t _size; + PyObject *fp; + Py_buffer *memview; + char stream_phase; + char all_bytes_read; + char closed; + uint32_t crc; + uint32_t stream_out; + uint32_t _last_mtime; + PyThread_type_lock lock; + zng_stream zst; +} GzipReader; + +static void GzipReader_dealloc(GzipReader *self) +{ + if (self->memview == NULL) { + PyMem_Free(self->input_buffer); + } else { + PyBuffer_Release(self->memview); + PyMem_Free(self->memview); + } + Py_XDECREF(self->fp); + PyThread_free_lock(self->lock); + zng_inflateEnd(&self->zst); + Py_TYPE(self)->tp_free(self); +} + +PyDoc_STRVAR(GzipReader__new____doc__, +"_GzipReader(fp, /, buffersize=32*1024)\n" +"--\n" +"\n" +"Return a _GzipReader object.\n" +"\n" +" fp\n" +" can be a file-like binary IO object or a bytes-like object.\n" +" For file-like objects _GzipReader's internal buffer is filled using \n" +" fp's readinto method during reading. For bytes-like objects, the \n" +" buffer protocol is used which allows _GzipReader to use the object \n" +" itself as read buffer. " +" buffersize\n" +" Size of the internal buffer. Only used when fp is a file-like object. \n" +" The buffer is automatically resized to fit the largest gzip header \n" +" upon use of the _GzipReader object.\n" +); + +static PyObject * +GzipReader__new__(PyTypeObject *type, PyObject *args, PyObject *kwargs) +{ + PyObject *fp = NULL; + Py_ssize_t buffer_size = 32 * 1024; + static char *keywords[] = {"fp", "buffersize", NULL}; + static char *format = "O|n:GzipReader"; + if (!PyArg_ParseTupleAndKeywords( + args, kwargs, format, keywords, &fp, &buffer_size)) { + return NULL; + } + if (buffer_size < 1) { + PyErr_Format( + PyExc_ValueError, + "buffersize must be at least 1, got %zd", buffer_size + ); + return NULL; + } + GzipReader *self = PyObject_New(GzipReader, type); + if (PyObject_HasAttrString(fp, "read")) { + self->memview = NULL; + self->buffer_size = buffer_size; + self->input_buffer = PyMem_Malloc(self->buffer_size); + if (self->input_buffer == NULL) { + Py_DECREF(self); + return PyErr_NoMemory(); + } + self->buffer_end = self->input_buffer; + self->all_bytes_read = 0; + } else { + self->memview = PyMem_Malloc(sizeof(Py_buffer)); + if (self->memview == NULL) { + return PyErr_NoMemory(); + } + if (PyObject_GetBuffer(fp, self->memview, PyBUF_SIMPLE) < 0) { + Py_DECREF(self); + return NULL; + } + self->buffer_size = self->memview->len; + self->input_buffer = self->memview->buf; + self->buffer_end = self->input_buffer + self->buffer_size; + self->all_bytes_read = 1; + } + self->current_pos = self->input_buffer; + self->_pos = 0; + self->_size = -1; + Py_INCREF(fp); + self->fp = fp; + self->stream_phase = GzipReader_HEADER; + self->closed = 0; + self->_last_mtime = 0; + self->crc = 0; + self->lock = PyThread_allocate_lock(); + if (self->lock == NULL) { + Py_DECREF(self); + PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock"); + return NULL; + } + self->zst.zalloc = PyZlib_Malloc; + self->zst.zfree = PyZlib_Free; + self->zst.next_in = NULL; + self->zst.avail_in = 0; + self->zst.opaque = NULL; + int err = zng_inflateInit2(&(self->zst), -MAX_WBITS); + switch (err) { + case Z_OK: + return (PyObject *)self; + case Z_STREAM_ERROR: + Py_DECREF(self); + PyErr_SetString(PyExc_ValueError, "Invalid initialization option"); + return NULL; + case Z_MEM_ERROR: + Py_DECREF(self); + PyErr_SetString(PyExc_MemoryError, + "Can't allocate memory for decompression object"); + return NULL; + default: + zlib_error(self->zst, err, "while creating decompression object"); + Py_DECREF(self); + return NULL; + } +} + +static inline Py_ssize_t +GzipReader_read_from_file(GzipReader *self) +{ + + const uint8_t *current_pos = self->current_pos; + const uint8_t *buffer_end = self->buffer_end; + size_t remaining = buffer_end - current_pos; + if (remaining == self->buffer_size) { + /* Buffer is full but a new read request was issued. This will be due + to the header being bigger than the header. Enlarge the buffer + to accommodate the hzip header. */ + size_t new_buffer_size = self->buffer_size * 2; + uint8_t *tmp_buffer = PyMem_Realloc(self->input_buffer, new_buffer_size); + if (tmp_buffer == NULL) { + PyErr_NoMemory(); + return -1; + } + self->input_buffer = tmp_buffer; + self->buffer_size = new_buffer_size; + } else if (remaining > 0) { + memmove(self->input_buffer, current_pos, remaining); + } + uint8_t *input_buffer = self->input_buffer; + current_pos = input_buffer; + buffer_end = input_buffer + remaining; + size_t read_in_size = self->buffer_size - remaining; + PyObject *bufview = PyMemoryView_FromMemory( + (char *)buffer_end, read_in_size, MEMORYVIEW_READINTO_FLAGS); + if (bufview == NULL) { + return -1; + } + PyObject *new_size_obj = PyObject_CallMethod(self->fp, "readinto", "O", bufview); + Py_DECREF(bufview); + if (new_size_obj == NULL) { + return -1; + } + Py_ssize_t new_size = PyLong_AsSsize_t(new_size_obj); + Py_DECREF(new_size_obj); + if (new_size < 0) { + return -1; + } + if (new_size == 0) { + self->all_bytes_read = 1; + } + buffer_end += new_size; + self->current_pos = current_pos; + self->buffer_end = buffer_end; + return 0; +} + +#define FTEXT 1 +#define FHCRC 2 +#define FEXTRA 4 +#define FNAME 8 +#define FCOMMENT 16 + +static PyObject *BadGzipFile; // Import BadGzipFile error for consistency + +static inline uint32_t load_u32_le(const void *address) { + #if PY_BIG_ENDIAN + uint8_t *mem = address; + return mem[0] | (mem[1] << 8) | (mem[2] << 16) | (mem[3] << 24); + #else + return *(uint32_t *)address; + #endif +} + +static inline uint16_t load_u16_le(const void *address) { + #if PY_BIG_ENDIAN + uint8_t *mem = address; + return mem[0] | (mem[1] << 8) | (mem[2] << 16) | (mem[3] << 24); + #else + return *(uint16_t *)address; + #endif +} + +static Py_ssize_t +GzipReader_read_into_buffer(GzipReader *self, uint8_t *out_buffer, size_t out_buffer_size) +{ + Py_ssize_t bytes_written = 0; + /* Outer loop is the file read in loop */ + while (1) { + const uint8_t *current_pos = self->current_pos; + const uint8_t *buffer_end = self->buffer_end; + /* Inner loop fills the out buffer, with multiple gzip blocks if + necessary. Allow escaping the GIL except when throwing errors. + This makes a big difference for BGZF format gzip blocks. + Threads are blocked when the loop is exited. */ + PyThreadState *_save; + Py_UNBLOCK_THREADS + while(1) { + switch(self->stream_phase) { + size_t remaining; // Must be before labels. + case GzipReader_HEADER: + remaining = buffer_end - current_pos; + if (remaining == 0 && self->all_bytes_read) { + // Reached EOF + self->_size = self->_pos; + self->current_pos = current_pos; + Py_BLOCK_THREADS; + return bytes_written; + } + if ((remaining) < 10) { + break; + } + uint8_t magic1 = current_pos[0]; + uint8_t magic2 = current_pos[1]; + + if (!(magic1 == 0x1f && magic2 == 0x8b)) { + Py_BLOCK_THREADS; + PyObject *magic_obj = PyBytes_FromStringAndSize((char *)current_pos, 2); + PyErr_Format(BadGzipFile, + "Not a gzipped file (%R)", + magic_obj); + Py_DECREF(magic_obj); + return -1; + }; + uint8_t method = current_pos[2]; + if (method != 8) { + Py_BLOCK_THREADS; + PyErr_SetString(BadGzipFile, "Unknown compression method"); + return -1; + } + uint8_t flags = current_pos[3]; + self->_last_mtime = load_u32_le(current_pos + 4); + // Skip XFL and header flag + const uint8_t *header_cursor = current_pos + 10; + if (flags & FEXTRA) { + // Read the extra field and discard it. + if (header_cursor + 2 >= buffer_end) { + break; + } + uint16_t flength = load_u16_le(header_cursor); + header_cursor += 2; + if (header_cursor + flength >= buffer_end) { + break; + } + header_cursor += flength; + } + if (flags & FNAME) { + header_cursor = memchr(header_cursor, 0, buffer_end - header_cursor); + if (header_cursor == NULL) { + break; + } + // skip over the 0 value; + header_cursor +=1; + } + if (flags & FCOMMENT) { + header_cursor = memchr(header_cursor, 0, buffer_end - header_cursor); + if (header_cursor == NULL) { + break; + } + // skip over the 0 value; + header_cursor +=1; + } + if (flags & FHCRC) { + if (header_cursor + 2 >= buffer_end) { + break; + } + uint16_t header_crc = load_u16_le(header_cursor); + uint16_t crc = zng_crc32_z( + 0, current_pos, header_cursor - current_pos) & 0xFFFF; + if (header_crc != crc) { + Py_BLOCK_THREADS; + PyErr_Format( + BadGzipFile, + "Corrupted gzip header. Checksums do not " + "match: %04x != %04x", + crc, header_crc + ); + return -1; + } + header_cursor += 2; + } + current_pos = header_cursor; + int reset_err = zng_inflateReset(&(self->zst)); + if (reset_err != Z_OK) { + Py_BLOCK_THREADS; + zlib_error(self->zst, reset_err, "while initializing inflate stream."); + return -1; + } + self->crc = 0; + self->stream_phase = GzipReader_DEFLATE_BLOCK; + case GzipReader_DEFLATE_BLOCK: + self->zst.next_in = current_pos; + self->zst.avail_in = Py_MIN((buffer_end -current_pos), UINT32_MAX); + self->zst.next_out = out_buffer; + self->zst.avail_out = Py_MIN(out_buffer_size, UINT32_MAX); + int ret; + ret = zng_inflate(&self->zst, Z_SYNC_FLUSH); + switch (ret) { + case Z_OK: + case Z_BUF_ERROR: + case Z_STREAM_END: + break; + case Z_MEM_ERROR: + Py_BLOCK_THREADS; + PyErr_SetString(PyExc_MemoryError, + "Out of memory while decompressing data"); + return -1; + default: + Py_BLOCK_THREADS; + zlib_error(self->zst, ret, "while decompressing data"); + return -1; + } + size_t current_bytes_written = self->zst.next_out - out_buffer; + self->crc = zng_crc32_z(self->crc, out_buffer, current_bytes_written); + bytes_written += current_bytes_written; + self->_pos += current_bytes_written; + out_buffer = self->zst.next_out; + out_buffer_size -= current_bytes_written; + current_pos = self->zst.next_in; + if (!(ret == Z_STREAM_END)) { + if (out_buffer_size > 0) { + if (current_pos == buffer_end) { + // Need fresh bytes + break; + } + // Not all input data decompressed. + continue; + } + self->current_pos = current_pos; + Py_BLOCK_THREADS; + return bytes_written; + } + // Block done check trailer. + self->stream_phase = GzipReader_TRAILER; + case GzipReader_TRAILER: + if (buffer_end - current_pos < 8) { + break; + } + uint32_t crc = load_u32_le(current_pos); + current_pos += 4; + if (crc != self->crc) { + Py_BLOCK_THREADS; + PyErr_Format( + BadGzipFile, + "CRC check failed %u != %u", + crc, self->crc + ); + return -1; + } + uint32_t length = load_u32_le(current_pos); + current_pos += 4; + if (length != self->zst.total_out) { + Py_BLOCK_THREADS; + PyErr_SetString(BadGzipFile, "Incorrect length of data produced"); + return -1; + } + self->stream_phase = GzipReader_NULL_BYTES; + case GzipReader_NULL_BYTES: + // There maybe NULL bytes between gzip members + while (current_pos < buffer_end && *current_pos == 0) { + current_pos += 1; + } + if (current_pos == buffer_end) { + /* Not all NULL bytes may have been read, refresh the buffer.*/ + break; + } + self->stream_phase = GzipReader_HEADER; + continue; + default: + Py_UNREACHABLE(); + } + break; + } + Py_BLOCK_THREADS; + // If buffer_end is reached, nothing was returned and all bytes are + // read we have an EOFError. + if (self->all_bytes_read) { + if (self->stream_phase == GzipReader_NULL_BYTES) { + self->_size = self->_pos; + self->current_pos = current_pos; + return bytes_written; + } + PyErr_SetString( + PyExc_EOFError, + "Compressed file ended before the end-of-stream marker was reached" + ); + return -1; + } + self->current_pos = current_pos; + if (GzipReader_read_from_file(self) < 0) { + return -1; + } + } +} + +static PyObject * +GzipReader_readinto(GzipReader *self, PyObject *buffer_obj) +{ + Py_buffer view; + if (PyObject_GetBuffer(buffer_obj, &view, PyBUF_SIMPLE) < 0) { + return NULL; + } + uint8_t *buffer = view.buf; + size_t buffer_size = view.len; + ENTER_ZLIB(self); + Py_ssize_t written_size = GzipReader_read_into_buffer(self, buffer, buffer_size); + LEAVE_ZLIB(self); + PyBuffer_Release(&view); + if (written_size < 0) { + return NULL; + } + return PyLong_FromSsize_t((Py_ssize_t)written_size); +} + +static PyObject * +GzipReader_seek(GzipReader *self, PyObject *args, PyObject *kwargs) +{ + Py_ssize_t offset; + Py_ssize_t whence = SEEK_SET; + static char *keywords[] = {"offset", "whence", NULL}; + static char format[] = {"n|n:GzipReader.seek"}; + if (PyArg_ParseTupleAndKeywords(args, kwargs, format, keywords, &offset, &whence) < 0) { + return NULL; + } + // Recalculate offset as an absolute file position. + if (whence == SEEK_SET) { + ; + } else if (whence == SEEK_CUR) { + offset = self->_pos + offset; + } else if (whence == SEEK_END) { + // Seeking relative to EOF - we need to know the file's size. + if (self->_size < 0) { + size_t tmp_buffer_size = 8 * 1024; + uint8_t *tmp_buffer = PyMem_Malloc(tmp_buffer_size); + if (tmp_buffer == NULL) { + return PyErr_NoMemory(); + } + while (1) { + /* Simply overwrite the tmp buffer over and over */ + Py_ssize_t written_bytes = GzipReader_read_into_buffer( + self, tmp_buffer, tmp_buffer_size + ); + if (written_bytes < 0) { + PyMem_FREE(tmp_buffer); + return NULL; + } + if (written_bytes == 0) { + break; + } + } + assert(self->_size >= 0); + PyMem_Free(tmp_buffer); + } + offset = self->_size + offset; + } else { + PyErr_Format( + PyExc_ValueError, + "Invalid format for whence: %zd", whence + ); + return NULL; + } + + // Make it so that offset is the number of bytes to skip forward. + if (offset < self->_pos) { + PyObject *seek_result = PyObject_CallMethod(self->fp, "seek", "n", 0); + if (seek_result == NULL) { + return NULL; + } + self->stream_phase = GzipReader_HEADER; + self->_pos = 0; + self->all_bytes_read = 0; + int ret = zng_inflateReset(&self->zst); + if (ret != Z_OK) { + zlib_error(self->zst, ret, "while seeking"); + return NULL; + } + } else { + offset -= self->_pos; + } + + // Read and discard data until we reach the desired position. + if (offset > 0) { + Py_ssize_t tmp_buffer_size = 8 * 1024; + uint8_t *tmp_buffer = PyMem_Malloc(tmp_buffer_size); + if (tmp_buffer == NULL) { + return PyErr_NoMemory(); + } + while (offset > 0) { + Py_ssize_t bytes_written = GzipReader_read_into_buffer( + self, tmp_buffer, Py_MIN(tmp_buffer_size, offset)); + if (bytes_written < 0) { + PyMem_FREE(tmp_buffer); + return NULL; + } + if (bytes_written == 0) { + break; + } + offset -= bytes_written; + } + PyMem_Free(tmp_buffer); + } + return PyLong_FromLongLong(self->_pos); +} + +static PyObject * +GzipReader_readall(GzipReader *self, PyObject *Py_UNUSED(ignore)) +{ + /* Try to consume the entire buffer without too much overallocation */ + Py_ssize_t chunk_size = self->buffer_size * 4; + /* Rather than immediately creating a list, read one chunk first and + only create a list when more read operations are necessary. */ + PyObject *first_chunk = PyBytes_FromStringAndSize(NULL, chunk_size); + if (first_chunk == NULL) { + return NULL; + } + ENTER_ZLIB(self); + Py_ssize_t written_size = GzipReader_read_into_buffer( + self, (uint8_t *)PyBytes_AS_STRING(first_chunk), chunk_size); + LEAVE_ZLIB(self); + if (written_size < 0) { + Py_DECREF(first_chunk); + return NULL; + } + if (written_size < chunk_size) { + if (_PyBytes_Resize(&first_chunk, written_size) < 0) { + return NULL; + } + return first_chunk; + } + + PyObject *chunk_list = PyList_New(1); + if (chunk_list == NULL) { + return NULL; + } + PyList_SET_ITEM(chunk_list, 0, first_chunk); + while (1) { + PyObject *chunk = PyBytes_FromStringAndSize(NULL, chunk_size); + if (chunk == NULL) { + Py_DECREF(chunk_list); + return NULL; + } + ENTER_ZLIB(self); + written_size = GzipReader_read_into_buffer( + self, (uint8_t *)PyBytes_AS_STRING(chunk), chunk_size); + LEAVE_ZLIB(self); + if (written_size < 0) { + Py_DECREF(chunk); + Py_DECREF(chunk_list); + return NULL; + } + if (written_size == 0) { + Py_DECREF(chunk); + break; + } + if (_PyBytes_Resize(&chunk, written_size) < 0) { + Py_DECREF(chunk_list); + return NULL; + } + int ret = PyList_Append(chunk_list, chunk); + Py_DECREF(chunk); + if (ret < 0) { + Py_DECREF(chunk_list); + return NULL; + } + } + PyObject *empty_bytes = PyBytes_FromStringAndSize(NULL, 0); + if (empty_bytes == NULL) { + Py_DECREF(chunk_list); + return NULL; + } + PyObject *ret = _PyBytes_Join(empty_bytes, chunk_list); + Py_DECREF(empty_bytes); + Py_DECREF(chunk_list); + return ret; +} + +static PyObject * +GzipReader_read(GzipReader *self, PyObject *args) +{ + Py_ssize_t size = -1; + if (PyArg_ParseTuple(args, "|n:GzipReader.read", &size) < 0) { + return NULL; + } + if (size < 0) { + return GzipReader_readall(self, NULL); + } + if (size == 0) { + return PyBytes_FromStringAndSize(NULL, 0); + } + Py_ssize_t answer_size = Py_MIN((Py_ssize_t)self->buffer_size * 10, size); + PyObject *answer = PyBytes_FromStringAndSize(NULL, answer_size); + if (answer == NULL) { + return NULL; + } + ENTER_ZLIB(self); + Py_ssize_t written_bytes = GzipReader_read_into_buffer(self, (uint8_t *)PyBytes_AS_STRING(answer), answer_size); + LEAVE_ZLIB(self); + if (written_bytes < 0) { + Py_DECREF(answer); + return NULL; + } + if (_PyBytes_Resize(&answer, written_bytes) < 0) { + return NULL; + } + return answer; +} + +static PyObject * +GzipReader_close(GzipReader *self, PyObject *Py_UNUSED(ignore)) { + if (!self->closed) { + self->closed = 1; + } + Py_RETURN_NONE; +} + +static PyObject * +GzipReader_readable(GzipReader *self, PyObject *Py_UNUSED(ignore)) +{ + Py_RETURN_TRUE; +} + +static PyObject * +GzipReader_writable(GzipReader *self, PyObject *Py_UNUSED(ignore)) +{ + Py_RETURN_TRUE; +} + +static PyObject * +GzipReader_seekable(GzipReader *self, PyObject *Py_UNUSED(ignore)) { + return PyObject_CallMethod(self->fp, "seekable", NULL); +} + +static PyObject * +GzipReader_tell(GzipReader *self, PyObject *Py_UNUSED(ignore)) { + return PyLong_FromLongLong(self->_pos); +} + +static PyObject * +GzipReader_flush(GzipReader *self, PyObject *Py_UNUSED(ignore)) { + Py_RETURN_NONE; +} + +static PyObject * +GzipReader_get_last_mtime(GzipReader *self, void *Py_UNUSED(closure)) +{ + if (self->_last_mtime) { + return PyLong_FromUnsignedLong(self->_last_mtime); + } + Py_RETURN_NONE; +} + +static PyObject * +GzipReader_get_closed(GzipReader *self, void *Py_UNUSED(closure)) +{ + return PyBool_FromLong(self->closed); +} + +static PyMethodDef GzipReader_methods[] = { + {"readinto", (PyCFunction)GzipReader_readinto, METH_O, NULL}, + {"readable", (PyCFunction)GzipReader_readable, METH_NOARGS, NULL}, + {"writable", (PyCFunction)GzipReader_writable, METH_NOARGS, NULL}, + {"seekable", (PyCFunction)GzipReader_seekable, METH_NOARGS, NULL}, + {"tell", (PyCFunction)GzipReader_tell, METH_NOARGS, NULL}, + {"seek", (PyCFunction)GzipReader_seek, METH_VARARGS | METH_KEYWORDS, NULL}, + {"close", (PyCFunction)GzipReader_close, METH_NOARGS, NULL}, + {"readall", (PyCFunction)GzipReader_readall, METH_NOARGS, NULL}, + {"flush", (PyCFunction)GzipReader_flush, METH_NOARGS, NULL}, + {"read", (PyCFunction)GzipReader_read, METH_VARARGS, NULL}, + {NULL}, +}; + +static PyGetSetDef GzipReader_properties[] = { + {"closed", (getter)GzipReader_get_closed, NULL, NULL, NULL}, + {"_last_mtime", (getter)GzipReader_get_last_mtime, NULL, NULL, NULL}, + {NULL}, +}; + +static PyTypeObject GzipReader_Type = { + .tp_name = "isal_zlib._GzipReader", + .tp_basicsize = sizeof(GzipReader), + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_dealloc = (destructor)GzipReader_dealloc, + .tp_new = (newfunc)(GzipReader__new__), + .tp_doc = GzipReader__new____doc__, + .tp_methods = GzipReader_methods, + .tp_getset = GzipReader_properties, +}; + + static PyMethodDef zlib_methods[] = { ZLIB_ADLER32_METHODDEF, ZLIB_COMPRESS_METHODDEF, ZLIB_COMPRESSOBJ_METHODDEF, ZLIB_CRC32_METHODDEF, + ZLIB_CRC32_COMBINE_METHODDEF, ZLIB_DECOMPRESS_METHODDEF, ZLIB_DECOMPRESSOBJ_METHODDEF, {NULL, NULL} @@ -2020,12 +2991,41 @@ PyInit_zlib_ng(void) Py_INCREF(ZlibDecompressorType_obj); PyModule_AddObject(m, "_ZlibDecompressor", ZlibDecompressorType_obj); + if (PyType_Ready(&GzipReader_Type) != 0) { + return NULL; + } + Py_INCREF(&GzipReader_Type); + if (PyModule_AddObject(m, "_GzipReader", (PyObject *)&GzipReader_Type) < 0) { + return NULL; + } + + if (PyType_Ready(&ParallelCompress_Type) != 0) { + return NULL; + } + Py_INCREF(&ParallelCompress_Type); + if (PyModule_AddObject(m, "_ParallelCompress", + (PyObject *)&ParallelCompress_Type) < 0) { + return NULL; + } + ZlibError = PyErr_NewException("zlib_ng.error", NULL, NULL); if (ZlibError == NULL) { return NULL; } Py_INCREF(ZlibError); PyModule_AddObject(m, "error", ZlibError); + + PyObject *gzip_module = PyImport_ImportModule("gzip"); + if (gzip_module == NULL) { + return NULL; + } + + BadGzipFile = PyObject_GetAttrString(gzip_module, "BadGzipFile"); + if (BadGzipFile == NULL) { + return NULL; + } + Py_INCREF(BadGzipFile); + PyModule_AddIntMacro(m, MAX_WBITS); PyModule_AddIntMacro(m, DEFLATED); PyModule_AddIntMacro(m, DEF_MEM_LEVEL); diff --git a/tests/data/test.fastq.bgzip.gz b/tests/data/test.fastq.bgzip.gz new file mode 100644 index 0000000..8ef5eb5 Binary files /dev/null and b/tests/data/test.fastq.bgzip.gz differ diff --git a/tests/test_gzip_compliance.py b/tests/test_gzip_compliance.py index f51c867..d938966 100644 --- a/tests/test_gzip_compliance.py +++ b/tests/test_gzip_compliance.py @@ -613,8 +613,7 @@ def test_read_truncated(self): with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f: self.assertRaises(EOFError, f.read) with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f: - self.assertEqual(f.read(len(data)), data) - self.assertRaises(EOFError, f.read, 1) + self.assertRaises(EOFError, f.read) # Incomplete 10-byte header. for i in range(2, 10): with gzip.GzipFile(fileobj=io.BytesIO(truncated[:i])) as f: @@ -628,13 +627,6 @@ def test_read_with_extra(self): with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f: self.assertEqual(f.read(), b'Test') - def test_prepend_error(self): - # See issue #20875 - with gzip.open(self.filename, "wb") as f: - f.write(data1) - with gzip.open(self.filename, "rb") as f: - f._buffer.raw._fp.prepend() - def test_issue44439(self): q = array.array('Q', [1, 2, 3, 4, 5]) LENGTH = len(q) * q.itemsize diff --git a/tests/test_gzip_ng.py b/tests/test_gzip_ng.py index 0a2ca86..a751d8e 100644 --- a/tests/test_gzip_ng.py +++ b/tests/test_gzip_ng.py @@ -14,7 +14,6 @@ import os import re import shutil -import subprocess import sys import tempfile import zlib @@ -23,26 +22,11 @@ import pytest -from zlib_ng import gzip_ng +from zlib_ng import gzip_ng, zlib_ng DATA = b'This is a simple test with gzip_ng' COMPRESSED_DATA = gzip.compress(DATA) TEST_FILE = str((Path(__file__).parent / "data" / "test.fastq.gz")) -PYPY = sys.implementation.name == "pypy" - - -def run_gzip_ng(*args, stdin=None): - """Calling gzip_ng externally seems to solve some issues on PyPy where - files would not be written properly when gzip_ng.main() was called. This is - probably due to some out of order execution that PyPy tries to pull. - Running the process externally is detrimental to the coverage report, - so this is only done for PyPy.""" - process = subprocess.Popen(["python", "-m", "zlib_ng.gzip_ng", *args], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - stdin=subprocess.PIPE) - - return process.communicate(stdin) def test_repr(): @@ -121,12 +105,9 @@ def test_decompress_infile_outfile(tmp_path, capsysbinary): def test_compress_infile_outfile(tmp_path, capsysbinary): test_file = tmp_path / "test" test_file.write_bytes(DATA) - if PYPY: - out, err = run_gzip_ng(str(test_file)) - else: - sys.argv = ['', str(test_file)] - gzip_ng.main() - out, err = capsysbinary.readouterr() + sys.argv = ['', str(test_file)] + gzip_ng.main() + out, err = capsysbinary.readouterr() out_file = test_file.with_suffix(".gz") assert err == b'' assert out == b'' @@ -189,12 +170,9 @@ def test_compress_infile_out_file(tmp_path, capsysbinary): test.write_bytes(DATA) out_file = tmp_path / "compressed.gz" args = ['-o', str(out_file), str(test)] - if PYPY: - out, err = run_gzip_ng(*args) - else: - sys.argv = ['', *args] - gzip_ng.main() - out, err = capsysbinary.readouterr() + sys.argv = ['', *args] + gzip_ng.main() + out, err = capsysbinary.readouterr() assert gzip.decompress(out_file.read_bytes()) == DATA assert err == b'' assert out == b'' @@ -206,12 +184,9 @@ def test_compress_infile_out_file_force(tmp_path, capsysbinary): out_file = tmp_path / "compressed.gz" out_file.touch() args = ['-f', '-o', str(out_file), str(test)] - if PYPY: - out, err = run_gzip_ng(*args) - else: - sys.argv = ['', *args] - gzip_ng.main() - out, err = capsysbinary.readouterr() + sys.argv = ['', *args] + gzip_ng.main() + out, err = capsysbinary.readouterr() assert gzip.decompress(out_file.read_bytes()) == DATA assert err == b'' assert out == b'' @@ -254,14 +229,11 @@ def test_compress_infile_out_file_inmplicit_name_prompt_accept( test.write_bytes(DATA) out_file = tmp_path / "test.gz" out_file.touch() - if PYPY: - out, err = run_gzip_ng(str(test), stdin=b"y\n") - else: - sys.argv = ['', str(test)] - mock_stdin = io.BytesIO(b"y") - sys.stdin = io.TextIOWrapper(mock_stdin) - gzip_ng.main() - out, err = capsysbinary.readouterr() + sys.argv = ['', str(test)] + mock_stdin = io.BytesIO(b"y") + sys.stdin = io.TextIOWrapper(mock_stdin) + gzip_ng.main() + out, err = capsysbinary.readouterr() assert b"already exists; do you wish to overwrite" in out assert err == b"" assert gzip.decompress(out_file.read_bytes()) == DATA @@ -271,13 +243,9 @@ def test_compress_infile_out_file_no_name(tmp_path, capsysbinary): test = tmp_path / "test" test.write_bytes(DATA) out_file = tmp_path / "compressed.gz" - args = ['-n', '-o', str(out_file), str(test)] - if PYPY: - out, err = run_gzip_ng(*args) - else: - sys.argv = ['', '-n', '-o', str(out_file), str(test)] - gzip_ng.main() - out, err = capsysbinary.readouterr() + sys.argv = ['', '-n', '-o', str(out_file), str(test)] + gzip_ng.main() + out, err = capsysbinary.readouterr() output = out_file.read_bytes() assert gzip.decompress(output) == DATA assert err == b'' @@ -405,9 +373,76 @@ def test_truncated_header(trunc): gzip_ng.decompress(trunc) +def test_very_long_header_in_data(): + # header with a very long filename. + header = (b"\x1f\x8b\x08\x08\x00\x00\x00\x00\x00\xff" + 256 * 1024 * b"A" + + b"\x00") + compressed = header + zlib_ng.compress(b"", 3, -15) + 8 * b"\00" + assert gzip_ng.decompress(compressed) == b"" + + +def test_very_long_header_in_file(): + # header with a very long filename. + header = (b"\x1f\x8b\x08\x08\x00\x00\x00\x00\x00\xff" + + gzip_ng.READ_BUFFER_SIZE * 2 * b"A" + + b"\x00") + compressed = header + zlib_ng.compress(b"", 3, -15) + 8 * b"\00" + f = io.BytesIO(compressed) + with gzip_ng.open(f) as gzip_file: + assert gzip_file.read() == b"" + + def test_concatenated_gzip(): concat = Path(__file__).parent / "data" / "concatenated.fastq.gz" data = gzip.decompress(concat.read_bytes()) with gzip_ng.open(concat, "rb") as gzip_ng_h: result = gzip_ng_h.read() assert data == result + + +def test_seek(): + from io import SEEK_CUR, SEEK_END, SEEK_SET + with tempfile.NamedTemporaryFile("wb", delete=False) as tmpfile: + tmpfile.write(gzip.compress(b"X" * 500 + b"A" + b"X" * 499)) + tmpfile.write(gzip.compress(b"X" * 500 + b"B" + b"X" * 499)) + tmpfile.write(gzip.compress(b"X" * 500 + b"C" + b"X" * 499)) + tmpfile.write(gzip.compress(b"X" * 500 + b"D" + b"X" * 499)) + with gzip_ng.open(tmpfile.name, "rb") as gzip_file: + # Start testing forward seek + gzip_file.seek(500) + assert gzip_file.read(1) == b"A" + gzip_file.seek(1500) + assert gzip_file.read(1) == b"B" + # Test reverse + gzip_file.seek(500) + assert gzip_file.read(1) == b"A" + # Again, but with explicit SEEK_SET + gzip_file.seek(500, SEEK_SET) + assert gzip_file.read(1) == b"A" + gzip_file.seek(1500, SEEK_SET) + assert gzip_file.read(1) == b"B" + gzip_file.seek(500, SEEK_SET) + assert gzip_file.read(1) == b"A" + # Seeking from current position + gzip_file.seek(500) + gzip_file.seek(2000, SEEK_CUR) + assert gzip_file.read(1) == b"C" + gzip_file.seek(-1001, SEEK_CUR) + assert gzip_file.read(1) == b"B" + # Seeking from end + # Any positive number should end up at the end + gzip_file.seek(200, SEEK_END) + assert gzip_file.read(1) == b"" + gzip_file.seek(-1500, SEEK_END) + assert gzip_file.read(1) == b"C" + os.remove(tmpfile.name) + + +def test_bgzip(): + bgzip_file = Path(__file__).parent / "data" / "test.fastq.bgzip.gz" + gzip_file = Path(__file__).parent / "data" / "test.fastq.gz" + with gzip_ng.open(bgzip_file, "rb") as bgz: + bgz_data = bgz.read() + with gzip_ng.open(gzip_file, "rb") as gz: + gz_data = gz.read() + assert bgz_data == gz_data diff --git a/tests/test_gzip_ng_threaded.py b/tests/test_gzip_ng_threaded.py new file mode 100644 index 0000000..871e3ae --- /dev/null +++ b/tests/test_gzip_ng_threaded.py @@ -0,0 +1,192 @@ +# Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, +# 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022 +# Python Software Foundation; All Rights Reserved + +# This file is part of python-isal which is distributed under the +# PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2. + +import gzip +import io +import itertools +import os +import tempfile +from pathlib import Path + +import pytest + +from zlib_ng import gzip_ng_threaded, zlib_ng + +TEST_FILE = str((Path(__file__).parent / "data" / "test.fastq.gz")) + + +def test_threaded_read(): + with gzip_ng_threaded.open(TEST_FILE, "rb") as thread_f: + thread_data = thread_f.read() + with gzip.open(TEST_FILE, "rb") as f: + data = f.read() + assert thread_data == data + + +@pytest.mark.parametrize(["mode", "threads"], + itertools.product(["wb", "wt"], [1, 3, -1])) +def test_threaded_write(mode, threads): + with tempfile.NamedTemporaryFile("wb", delete=False) as tmp: + # Use a small block size to simulate many writes. + with gzip_ng_threaded.open(tmp, mode, threads=threads, + block_size=8*1024) as out_file: + gzip_open_mode = "rb" if "b" in mode else "rt" + with gzip.open(TEST_FILE, gzip_open_mode) as in_file: + while True: + block = in_file.read(128 * 1024) + if not block: + break + out_file.write(block) + with gzip.open(TEST_FILE, "rt") as test_file: + test_data = test_file.read() + with gzip.open(tmp.name, "rt") as test_out: + out_data = test_out.read() + assert test_data == out_data + + +def test_threaded_open_no_threads(): + with tempfile.TemporaryFile("rb") as tmp: + klass = gzip_ng_threaded.open(tmp, "rb", threads=0) + # igzip.IGzipFile inherits gzip.Gzipfile + assert isinstance(klass, gzip.GzipFile) + + +def test_threaded_open_not_a_file_or_pathlike(): + i_am_a_tuple = (1, 2, 3) + with pytest.raises(TypeError) as error: + gzip_ng_threaded.open(i_am_a_tuple) + error.match("str") + error.match("bytes") + error.match("file") + + +# Test whether threaded readers and writers throw an error rather than hang +# indefinitely. + +@pytest.mark.timeout(5) +def test_threaded_read_error(): + with open(TEST_FILE, "rb") as f: + data = f.read() + truncated_data = data[:-8] + with gzip_ng_threaded.open(io.BytesIO(truncated_data), "rb") as tr_f: + with pytest.raises(EOFError): + tr_f.read() + + +@pytest.mark.timeout(5) +@pytest.mark.parametrize("threads", [1, 3]) +def test_threaded_write_oversized_block_no_error(threads): + # Random bytes are incompressible, and therefore are guaranteed to + # trigger a buffer overflow when larger than block size unless handled + # correctly. + data = os.urandom(1024 * 63) # not a multiple of block_size + with tempfile.NamedTemporaryFile(mode="wb", delete=False) as tmp: + with gzip_ng_threaded.open( + tmp, "wb", compresslevel=3, threads=threads, + block_size=8 * 1024 + ) as writer: + writer.write(data) + with gzip.open(tmp.name, "rb") as gzipped: + decompressed = gzipped.read() + assert data == decompressed + + +@pytest.mark.timeout(5) +@pytest.mark.parametrize("threads", [1, 3]) +def test_threaded_write_error(threads): + f = gzip_ng_threaded._ThreadedGzipWriter( + io.BytesIO(), level=3, + threads=threads, block_size=8 * 1024) + # Bypass the write method which should not allow blocks larger than + # block_size. + f.input_queues[0].put((os.urandom(1024 * 64), b"")) + with pytest.raises(OverflowError) as error: + f.close() + error.match("Compressed output exceeds buffer size") + + +def test_close_reader(): + tmp = io.BytesIO(Path(TEST_FILE).read_bytes()) + f = gzip_ng_threaded._ThreadedGzipReader(tmp, "rb") + f.close() + assert f.closed + # Make sure double closing does not raise errors + f.close() + + +@pytest.mark.parametrize("threads", [1, 3]) +def test_close_writer(threads): + f = gzip_ng_threaded._ThreadedGzipWriter( + io.BytesIO(), threads=threads) + f.close() + assert f.closed + # Make sure double closing does not raise errors + f.close() + + +def test_reader_not_writable(): + with gzip_ng_threaded.open(TEST_FILE, "rb") as f: + assert not f.writable() + + +def test_writer_not_readable(): + with gzip_ng_threaded.open(io.BytesIO(), "wb") as f: + assert not f.readable() + + +def test_writer_wrong_level(): + with tempfile.NamedTemporaryFile("wb") as tmp: + with pytest.raises(zlib_ng.error) as error: + gzip_ng_threaded.open(tmp.name, mode="wb", compresslevel=42) + error.match("Bad compression level") + + +def test_writer_too_low_threads(): + with pytest.raises(ValueError) as error: + gzip_ng_threaded._ThreadedGzipWriter(io.BytesIO(), threads=0) + error.match("threads") + error.match("at least 1") + + +def test_reader_read_after_close(): + with open(TEST_FILE, "rb") as test_f: + f = gzip_ng_threaded._ThreadedGzipReader(test_f) + f.close() + with pytest.raises(ValueError) as error: + f.read(1024) + error.match("closed") + + +@pytest.mark.parametrize("threads", [1, 3]) +def test_writer_write_after_close(threads): + f = gzip_ng_threaded._ThreadedGzipWriter(io.BytesIO(), threads=threads) + f.close() + with pytest.raises(ValueError) as error: + f.write(b"abc") + error.match("closed") + + +def test_gzip_ng_threaded_append(tmp_path): + test_file = tmp_path / "test.txt.gz" + with gzip_ng_threaded.open(test_file, "wb") as f: + f.write(b"AB") + with gzip_ng_threaded.open(test_file, mode="ab") as f: + f.write(b"CD") + with gzip.open(test_file, "rb") as f: + contents = f.read() + assert contents == b"ABCD" + + +def test_gzip_ng_threaded_append_text_mode(tmp_path): + test_file = tmp_path / "test.txt.gz" + with gzip_ng_threaded.open(test_file, "wt") as f: + f.write("AB") + with gzip_ng_threaded.open(test_file, mode="at") as f: + f.write("CD") + with gzip.open(test_file, "rt") as f: + contents = f.read() + assert contents == "ABCD" diff --git a/tests/test_zlib_compliance.py b/tests/test_zlib_compliance.py index 1f8ff53..3d5ab65 100644 --- a/tests/test_zlib_compliance.py +++ b/tests/test_zlib_compliance.py @@ -108,6 +108,17 @@ def test_crc32_adler32_unsigned(self): self.assertEqual(zlib.adler32(foo + foo), 3573550353) self.assertEqual(zlib.adler32(b'spam'), 72286642) + def test_crc32_combine(self): + foo = b'abcdefghijklmnop' + self.assertEqual(zlib.crc32_combine(0, 0, 0), 0) + self.assertEqual(zlib.crc32_combine(1, 0, 0), 1) + self.assertEqual(zlib.crc32_combine(432, 0, 0), 432) + self.assertEqual( + zlib.crc32_combine( + zlib.crc32(foo), zlib.crc32(foo), len(foo)), + zlib.crc32(foo + foo) + ) + def test_same_as_binascii_crc32(self): foo = b'abcdefghijklmnop' crc = 2486878355 diff --git a/tox.ini b/tox.ini index 5085404..06b7f91 100644 --- a/tox.ini +++ b/tox.ini @@ -7,9 +7,12 @@ isolated_build=True [testenv] deps=pytest + pytest-timeout coverage passenv= PYTHON_ZLIB_NG_LINK_DYNAMIC +setenv= + PYTHONDEVMODE=1 commands = # Create HTML coverage report for humans and xml coverage report for external services. coverage run --branch --source=zlib_ng -m pytest tests @@ -17,6 +20,25 @@ commands = coverage html -i coverage xml -i +[testenv:asan] +setenv= + PYTHONDEVMODE=1 + PYTHONMALLOC=malloc + CFLAGS=-lasan -fsanitize=address -fno-omit-frame-pointer +allowlist_externals=bash +commands= + bash -c 'export LD_PRELOAD=$(gcc -print-file-name=libasan.so) && printenv LD_PRELOAD && python -c "from zlib_ng import zlib_ng" && pytest tests' + +[testenv:compliance] +deps=pytest +commands= + pytest -v tests/test_zlib_compliance.py tests/test_gzip_compliance.py + +[testenv:compatibility] +deps=pytest +commands= + pytest tests/test_isal.py + [testenv:lint] deps=flake8 flake8-import-order @@ -56,21 +78,25 @@ commands= [testenv:benchmark-all] deps= +setenv = commands= python ./benchmark_scripts/benchmark.py --all [testenv:benchmark-functions] deps= +setenv = commands= python ./benchmark_scripts/benchmark.py --functions [testenv:benchmark-gzip] deps= +setenv = commands= python ./benchmark_scripts/benchmark.py --gzip [testenv:benchmark-checksums] deps= +setenv = commands= python ./benchmark_scripts/benchmark.py --checksums