diff --git a/CHANGELOG.rst b/CHANGELOG.rst index e38a722..bdd0f54 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,6 +7,12 @@ Changelog .. This document is user facing. Please word the changes in such a way .. that users understand how the changes affect the new version. +version 0.5.1 +----------------- ++ Fix a bug where ``gzip_ng_threaded.open`` could + cause a hang when the program exited and the program was not used with a + context manager. + version 0.5.0 ----------------- + Wheels are now build for MacOS arm64 architectures. diff --git a/src/zlib_ng/gzip_ng_threaded.py b/src/zlib_ng/gzip_ng_threaded.py index 5b8a9ff..7fa7249 100644 --- a/src/zlib_ng/gzip_ng_threaded.py +++ b/src/zlib_ng/gzip_ng_threaded.py @@ -98,7 +98,8 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024): self.exception = None self.buffer = io.BytesIO() self.block_size = block_size - self.worker = threading.Thread(target=self._decompress) + # Using a daemon thread prevents programs freezing on error. + self.worker = threading.Thread(target=self._decompress, daemon=True) self._closed = False self.running = True self.worker.start() @@ -231,9 +232,10 @@ def __init__(self, queue.Queue(queue_size) for _ in range(threads)] self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [ queue.Queue(queue_size) for _ in range(threads)] - self.output_worker = threading.Thread(target=self._write) + # Using daemon threads prevents a program freezing on error. + self.output_worker = threading.Thread(target=self._write, daemon=True) self.compression_workers = [ - threading.Thread(target=self._compress, args=(i,)) + threading.Thread(target=self._compress, args=(i,), daemon=True) for i in range(threads) ] elif threads == 1: @@ -241,7 +243,7 @@ def __init__(self, self.output_queues = [] self.compression_workers = [] self.output_worker = threading.Thread( - target=self._compress_and_write) + target=self._compress_and_write, daemon=True) else: raise ValueError(f"threads should be at least 1, got {threads}") self.threads = threads diff --git a/tests/test_gzip_ng_threaded.py b/tests/test_gzip_ng_threaded.py index 1032a67..7ed06de 100644 --- a/tests/test_gzip_ng_threaded.py +++ b/tests/test_gzip_ng_threaded.py @@ -9,6 +9,8 @@ import io import itertools import os +import subprocess +import sys import tempfile from pathlib import Path @@ -209,3 +211,22 @@ def test_threaded_writer_does_not_close_stream(): assert not test_stream.closed test_stream.seek(0) assert gzip.decompress(test_stream.read()) == b"thisisatest" + + +@pytest.mark.timeout(5) +@pytest.mark.parametrize( + ["mode", "threads"], itertools.product(["rb", "wb"], [1, 2])) +def test_threaded_program_can_exit_on_error(tmp_path, mode, threads): + program = tmp_path / "no_context_manager.py" + test_file = tmp_path / "output.gz" + # Write 40 mb input data to saturate read buffer. Because of the repetitive + # nature the resulting gzip file is very small (~40 KiB). + test_file.write_bytes(gzip.compress(b"test" * (10 * 1024 * 1024))) + with open(program, "wt") as f: + f.write("from zlib_ng import gzip_ng_threaded\n") + f.write( + f"f = gzip_ng_threaded.open('{test_file}', " + f"mode='{mode}', threads={threads})\n" + ) + f.write("raise Exception('Error')\n") + subprocess.run([sys.executable, str(program)])