From 123b0bf3ef8680a3faf5b17af971f4c626dd3b1b Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 18 Mar 2025 15:01:27 +0100 Subject: [PATCH 1/8] Start on bgzip module threaded reading --- src/isal/_bgzipmodule.c | 33 ++++++++++ src/isal/igzip_threaded.py | 132 +++++++++++++++++++++++++++++++++++++ 2 files changed, 165 insertions(+) create mode 100644 src/isal/_bgzipmodule.c diff --git a/src/isal/_bgzipmodule.c b/src/isal/_bgzipmodule.c new file mode 100644 index 00000000..5b5491d0 --- /dev/null +++ b/src/isal/_bgzipmodule.c @@ -0,0 +1,33 @@ +/* +Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, +2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022 +Python Software Foundation; All Rights Reserved + +This file is part of python-isal which is distributed under the +PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2. + +This file is not originally from the CPython distribution. But it does +contain mostly example code from the Python docs. Also dual licensing just +for this one file seemed silly. +*/ + +#define PY_SSIZE_T_CLEAN +#include "Python.h" + +static struct PyModuleDef _bgzip_module = { + PyModuleDef_HEAD_INIT, + "_bgzip", /* name of module */ + NULL, /* module documentation, may be NULL */ + -1, + NULL, +}; + +PyMODINIT_FUNC +PyInit__bgzip(void) +{ + PyObject *m = PyModule_Create(&_bgzip_module); + if (m == NULL) { + return NULL; + } + return m; +} diff --git a/src/isal/igzip_threaded.py b/src/isal/igzip_threaded.py index 7f1c94fc..8cce2650 100644 --- a/src/isal/igzip_threaded.py +++ b/src/isal/igzip_threaded.py @@ -167,6 +167,138 @@ def closed(self) -> bool: return self._closed +class ThreadedBGZipReader(io.RawIOBase): + def __init__(self, filename, threads=2, queue_size=2, block_size=1024 * 1024): + if threads < 2: + raise RuntimeError("_ThreadedGzipReader class handles that use case.") + self.raw, self.closefd = open_as_binary_stream(filename, "rb") + + self.lock = threading.Lock() + self.pos = 0 + self.read_file = False + self.input_queues = [queue.Queue(queue_size) for _ in range(threads)] + self.output_queues = [queue.Queue(queue_size) for _ in range(threads)] + self.eof = False + self.exception = None + self.buffer = io.BytesIO() + self.block_size = block_size + self.input_worker = threading.Thread(target=self._read_input) + self.output_workers = [threading.Thread(target=self._decompress, args=(i,)) for i in range(threads)] + self._closed = False + self.running = True + self._calling_thread = threading.current_thread() + self.input_worker.start() + for worker in self.output_workers: + worker.start() + + def _check_closed(self, msg=None): + if self._closed: + raise ValueError("I/O operation on closed file") + + def _read_input(self): + block_size = self.block_size + number_of_queues = len(self.output_queues) + input_index = 0 + previous_block = b"" + while self.running and self._calling_thread.is_alive(): + to_read = block_size - len(previous_block) + block = previous_block + self.raw.read(to_read) + blocks_end = find_last_bgzip_end(block) + compressed = block[:blocks_end] + previous_block = block[blocks_end:] + input_queue = self.input_queues[input_index] + while self.running and self._calling_thread.is_alive(): + try: + input_queue.put(compressed, timeout=0.05) + except queue.Full: + pass + input_index += 1 + input_index %= number_of_queues + + def _decompress(self, index: int): + input_queue = self.input_queues[index] + output_queue = self.output_queues[index] + while self.running and self._calling_thread.is_alive(): + try: + input_data = input_queue.get(timeout=0.05) + except queue.Empty: + if not self.input_worker.is_alive(): + return + continue + try: + decompressed = isal_zlib._GzipReader(input_data).read() + except Exception as e: + with self.lock: + self.exception = e + return + input_queue.task_done() + while self.running and self._calling_thread.is_alive(): + try: + output_queue.put(decompressed, timeout=0.05) + break + except queue.Full: + pass + + def _set_error_and_empty_queue(self, error, q): + with self.lock: + self.exception = error + # Abort everything and empty the queue + self.running = False + while True: + try: + _ = q.get(timeout=0.05) + q.task_done() + except queue.Empty: + return + + def readinto(self, b): + self._check_closed() + result = self.buffer.readinto(b) + index = 0 + number_of_queues = len(self.output_queues) + if result == 0: + output_queue = self.output_queues[index] + while True: + try: + data_from_queue = output_queue.get(timeout=0.01) + index += 1 + index %= number_of_queues + break + except queue.Empty: + if self.exception: + raise self.exception + if not any(worker.is_alive() for worker in self.output_workers): + # EOF reached + return 0 + self.buffer = io.BytesIO(data_from_queue) + result = self.buffer.readinto(b) + self.pos += result + return result + + def readable(self) -> bool: + return True + + def tell(self) -> int: + self._check_closed() + return self.pos + + def close(self) -> None: + if self._closed: + return + self.running = False + self.input_worker.join() + for worker in self.output_workers: + worker.join() + self.fileobj.close() + if self.closefd: + self.raw.close() + self._closed = True + + @property + def closed(self) -> bool: + return self._closed + + class FlushableBufferedWriter(io.BufferedWriter): def flush(self): super().flush() From b59e7953ffaf3237d38d1d6a89034e4cc5f0c743 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 18 Mar 2025 15:41:06 +0100 Subject: [PATCH 2/8] Implement bgzip skipper --- setup.py | 1 + src/isal/_bgzipmodule.c | 93 ++++++++++++++++++++++++++++++++++++++--- 2 files changed, 89 insertions(+), 5 deletions(-) diff --git a/setup.py b/setup.py index 55c89496..e8bd66ca 100644 --- a/setup.py +++ b/setup.py @@ -42,6 +42,7 @@ Extension("isal.isal_zlib", ["src/isal/isal_zlibmodule.c"]), Extension("isal.igzip_lib", ["src/isal/igzip_libmodule.c"]), Extension("isal._isal", ["src/isal/_isalmodule.c"]), + Extension("isal._bgzip", ["src/isal/_bgzipmodule.c"]), ] diff --git a/src/isal/_bgzipmodule.c b/src/isal/_bgzipmodule.c index 5b5491d0..d6df0c62 100644 --- a/src/isal/_bgzipmodule.c +++ b/src/isal/_bgzipmodule.c @@ -12,14 +12,97 @@ for this one file seemed silly. */ #define PY_SSIZE_T_CLEAN -#include "Python.h" +#include +#include + +#define FEXTRA 4 + +static inline uint16_t load_u16_le(const void *address) { + #if PY_BIG_ENDIAN + uint8_t *mem = address; + return mem[0] | (mem[1] << 8); + #else + return *(uint16_t *)address; + #endif +} + +static PyObject *find_last_bgzip_end(PyObject *module, PyObject *buffer_obj) { + Py_buffer buf; + int ret = PyObject_GetBuffer(buffer_obj, &buf, PyBUF_SIMPLE); + if (ret == -1) { + return NULL; + } + const uint8_t *data = buf.buf; + Py_ssize_t data_length = buf.len; + const uint8_t *data_end = data + data_length; + const uint8_t *cursor = data; + + Py_ssize_t answer = 0; + while (true) { + if (data_end - cursor < 18) { + break; + } + uint8_t magic1 = cursor[0]; + uint8_t magic2 = cursor[1]; + if (magic1 != 31 || magic2 != 139) { + PyErr_Format(PyExc_ValueError, + "Incorrect gzip magic: %x, %x", magic1, magic2); + return NULL; + } + uint8_t method = cursor[2]; + if (method != 8) { + PyErr_Format( + PyExc_ValueError, + "Incorrect method: %x", method + ); + return NULL; + } + uint8_t flags = cursor[3]; + if (flags != FEXTRA) { + PyErr_SetString( + PyExc_NotImplementedError, + "Only bgzip files with only FEXTRA flag set are supported." + ); + return NULL; + } + uint16_t xlen = load_u16_le(cursor + 10); + if (xlen != 6) { + PyErr_SetString( + PyExc_NotImplementedError, + "Only bgzip files with one extra field are supported." + ); + return NULL; + } + uint8_t si1 = cursor[12]; + uint8_t si2 = cursor[13]; + uint16_t subfield_length = load_u16_le(cursor + 14); + if (si1 != 66 || si2 != 67 || subfield_length != 2) { + PyErr_Format( + PyExc_ValueError, + "Incorrectly formatted magic and subfield length, " + "expected 66, 67, 2 got %d, %d, %d", + si1, si2, subfield_length + ); + return NULL; + } + uint16_t block_size = load_u16_le(cursor + 16); + size_t actual_block_size = block_size + 1; + cursor += actual_block_size; + } + return PyLong_FromSsize_t(answer); +} + +static PyMethodDef _bgzip_methods[] = { + {"find_last_bgzip_end", find_last_bgzip_end, METH_O, NULL}, + {NULL}, +}; static struct PyModuleDef _bgzip_module = { PyModuleDef_HEAD_INIT, - "_bgzip", /* name of module */ - NULL, /* module documentation, may be NULL */ - -1, - NULL, + .m_name = "_bgzip", + .m_doc = NULL, + .m_size = -1, + .m_methods = _bgzip_methods, }; PyMODINIT_FUNC From 8a81c58ad775939d516f7be3b90a468d4e6523d6 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 18 Mar 2025 16:03:11 +0100 Subject: [PATCH 3/8] Simplify bgzip header check --- src/isal/_bgzipmodule.c | 52 ++++++++++++++++------------------------- src/isal/isa-l | 2 +- 2 files changed, 21 insertions(+), 33 deletions(-) diff --git a/src/isal/_bgzipmodule.c b/src/isal/_bgzipmodule.c index d6df0c62..0e602897 100644 --- a/src/isal/_bgzipmodule.c +++ b/src/isal/_bgzipmodule.c @@ -44,44 +44,32 @@ static PyObject *find_last_bgzip_end(PyObject *module, PyObject *buffer_obj) { } uint8_t magic1 = cursor[0]; uint8_t magic2 = cursor[1]; - if (magic1 != 31 || magic2 != 139) { - PyErr_Format(PyExc_ValueError, - "Incorrect gzip magic: %x, %x", magic1, magic2); - return NULL; - } uint8_t method = cursor[2]; - if (method != 8) { - PyErr_Format( - PyExc_ValueError, - "Incorrect method: %x", method - ); - return NULL; - } uint8_t flags = cursor[3]; - if (flags != FEXTRA) { - PyErr_SetString( - PyExc_NotImplementedError, - "Only bgzip files with only FEXTRA flag set are supported." - ); - return NULL; - } uint16_t xlen = load_u16_le(cursor + 10); - if (xlen != 6) { - PyErr_SetString( - PyExc_NotImplementedError, - "Only bgzip files with one extra field are supported." - ); - return NULL; - } uint8_t si1 = cursor[12]; uint8_t si2 = cursor[13]; - uint16_t subfield_length = load_u16_le(cursor + 14); - if (si1 != 66 || si2 != 67 || subfield_length != 2) { + uint16_t subfield_length = load_u16_le(cursor + 14); + if ( + magic1 != 31 || + magic2 != 139 || + method != 8 || + flags != FEXTRA || + xlen != 6 || + si1 != 66 || + si2 != 67 || + subfield_length != 2 + ) { PyErr_Format( - PyExc_ValueError, - "Incorrectly formatted magic and subfield length, " - "expected 66, 67, 2 got %d, %d, %d", - si1, si2, subfield_length + PyExc_ValueError, + "Incorrect bgzip header:\n" + "magic: %x, %x\n" + "method: %x\n" + "flags: %x\n" + "xlen: %d\n" + "si1, si2: %d, %d \n" + "subfield_length: %d", + magic1, magic2, method, flags, xlen, si1, si2, subfield_length ); return NULL; } diff --git a/src/isal/isa-l b/src/isal/isa-l index c387163f..e3569fb7 160000 --- a/src/isal/isa-l +++ b/src/isal/isa-l @@ -1 +1 @@ -Subproject commit c387163fcbca62701d646149564c550c78a4f985 +Subproject commit e3569fb7224f071fe5ae510453f4451497385e44 From cd97677c558787378f50a8a1ffcd06de0f615a2e Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 18 Mar 2025 16:22:02 +0100 Subject: [PATCH 4/8] Correct implementation of algorithm --- src/isal/_bgzipmodule.c | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/isal/_bgzipmodule.c b/src/isal/_bgzipmodule.c index 0e602897..a8c6e311 100644 --- a/src/isal/_bgzipmodule.c +++ b/src/isal/_bgzipmodule.c @@ -37,9 +37,8 @@ static PyObject *find_last_bgzip_end(PyObject *module, PyObject *buffer_obj) { const uint8_t *data_end = data + data_length; const uint8_t *cursor = data; - Py_ssize_t answer = 0; while (true) { - if (data_end - cursor < 18) { + if (cursor + 18 > data_end) { break; } uint8_t magic1 = cursor[0]; @@ -74,10 +73,13 @@ static PyObject *find_last_bgzip_end(PyObject *module, PyObject *buffer_obj) { return NULL; } uint16_t block_size = load_u16_le(cursor + 16); - size_t actual_block_size = block_size + 1; - cursor += actual_block_size; + const uint8_t *new_start = cursor + block_size + 1; + if (new_start > data_end) { + break; + } + cursor = new_start; } - return PyLong_FromSsize_t(answer); + return PyLong_FromSsize_t(cursor - data); } static PyMethodDef _bgzip_methods[] = { From ac876abfa15e9aac2a79e2e27c7b889d3dac2589 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 18 Mar 2025 17:05:42 +0100 Subject: [PATCH 5/8] Get reader in workable state --- src/isal/_bgzip.pyi | 8 ++++++++ src/isal/igzip_threaded.py | 21 ++++++++++++--------- 2 files changed, 20 insertions(+), 9 deletions(-) create mode 100644 src/isal/_bgzip.pyi diff --git a/src/isal/_bgzip.pyi b/src/isal/_bgzip.pyi new file mode 100644 index 00000000..b41a42ad --- /dev/null +++ b/src/isal/_bgzip.pyi @@ -0,0 +1,8 @@ +# Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, +# 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022 +# Python Software Foundation; All Rights Reserved + +# This file is part of python-isal which is distributed under the +# PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2. + +def find_last_bgzip_end(__data: bytes) -> int: ... diff --git a/src/isal/igzip_threaded.py b/src/isal/igzip_threaded.py index 8cce2650..16b98ef4 100644 --- a/src/isal/igzip_threaded.py +++ b/src/isal/igzip_threaded.py @@ -14,7 +14,7 @@ import threading from typing import List, Optional, Tuple -from . import igzip, isal_zlib +from . import igzip, isal_zlib, _bgzip DEFLATE_WINDOW_SIZE = 2 ** 15 @@ -167,7 +167,7 @@ def closed(self) -> bool: return self._closed -class ThreadedBGZipReader(io.RawIOBase): +class _ThreadedBGzipReader(io.RawIOBase): def __init__(self, filename, threads=2, queue_size=2, block_size=1024 * 1024): if threads < 2: raise RuntimeError("_ThreadedGzipReader class handles that use case.") @@ -175,6 +175,7 @@ def __init__(self, filename, threads=2, queue_size=2, block_size=1024 * 1024): self.lock = threading.Lock() self.pos = 0 + self._read_from_index = 0 self.read_file = False self.input_queues = [queue.Queue(queue_size) for _ in range(threads)] self.output_queues = [queue.Queue(queue_size) for _ in range(threads)] @@ -203,13 +204,16 @@ def _read_input(self): while self.running and self._calling_thread.is_alive(): to_read = block_size - len(previous_block) block = previous_block + self.raw.read(to_read) - blocks_end = find_last_bgzip_end(block) + if block == b"": + return + blocks_end = _bgzip.find_last_bgzip_end(block) compressed = block[:blocks_end] previous_block = block[blocks_end:] input_queue = self.input_queues[input_index] while self.running and self._calling_thread.is_alive(): try: input_queue.put(compressed, timeout=0.05) + break except queue.Full: pass input_index += 1 @@ -254,15 +258,14 @@ def _set_error_and_empty_queue(self, error, q): def readinto(self, b): self._check_closed() result = self.buffer.readinto(b) - index = 0 - number_of_queues = len(self.output_queues) if result == 0: - output_queue = self.output_queues[index] + output_queue = self.output_queues[self._read_from_index] while True: try: data_from_queue = output_queue.get(timeout=0.01) - index += 1 - index %= number_of_queues + output_queue.task_done() + self._read_from_index += 1 + self._read_from_index %= len(self.output_queues) break except queue.Empty: if self.exception: @@ -289,7 +292,7 @@ def close(self) -> None: self.input_worker.join() for worker in self.output_workers: worker.join() - self.fileobj.close() + self.buffer.close() if self.closefd: self.raw.close() self._closed = True From deaa0735fbd2e1c19dc5bcadca9ec9f436848045 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 18 Mar 2025 17:57:02 +0100 Subject: [PATCH 6/8] Massive reduction of memory usage --- src/isal/igzip_threaded.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/isal/igzip_threaded.py b/src/isal/igzip_threaded.py index 16b98ef4..170faeb6 100644 --- a/src/isal/igzip_threaded.py +++ b/src/isal/igzip_threaded.py @@ -200,19 +200,23 @@ def _read_input(self): block_size = self.block_size number_of_queues = len(self.output_queues) input_index = 0 - previous_block = b"" + buffer = bytearray(block_size) + buffer_view = memoryview(buffer) + offset = 0 while self.running and self._calling_thread.is_alive(): - to_read = block_size - len(previous_block) - block = previous_block + self.raw.read(to_read) - if block == b"": + bytes_read = self.raw.readinto(buffer_view[offset:]) + total_read = offset + bytes_read + if bytes_read == 0: return - blocks_end = _bgzip.find_last_bgzip_end(block) - compressed = block[:blocks_end] - previous_block = block[blocks_end:] + blocks_end = _bgzip.find_last_bgzip_end(buffer_view[:total_read]) + compressed = bytes(buffer_view[:blocks_end]) + leftover = buffer_view[blocks_end:total_read] + offset = leftover.nbytes + buffer[:offset] = leftover input_queue = self.input_queues[input_index] while self.running and self._calling_thread.is_alive(): try: - input_queue.put(compressed, timeout=0.05) + input_queue.put(compressed, timeout=0.01) break except queue.Full: pass @@ -224,7 +228,7 @@ def _decompress(self, index: int): output_queue = self.output_queues[index] while self.running and self._calling_thread.is_alive(): try: - input_data = input_queue.get(timeout=0.05) + input_data = input_queue.get(timeout=0.01) except queue.Empty: if not self.input_worker.is_alive(): return From 3a8b7470861a3f8908429b452af30da9695404a4 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 18 Mar 2025 18:53:45 +0100 Subject: [PATCH 7/8] Set one thread as a default --- src/isal/igzip_threaded.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/isal/igzip_threaded.py b/src/isal/igzip_threaded.py index 170faeb6..d2bef8d3 100644 --- a/src/isal/igzip_threaded.py +++ b/src/isal/igzip_threaded.py @@ -168,9 +168,14 @@ def closed(self) -> bool: class _ThreadedBGzipReader(io.RawIOBase): - def __init__(self, filename, threads=2, queue_size=2, block_size=1024 * 1024): - if threads < 2: - raise RuntimeError("_ThreadedGzipReader class handles that use case.") + """ + Reads BGZip files multithreaded. For one thread, the normal gzip reader + class is more efficient, as it operates fewer queues and keeps less + allocated data around. + """ + def __init__(self, filename, threads=1, queue_size=2, block_size=1024 * 1024): + if threads < 1: + raise RuntimeError("At least one thread is needed") self.raw, self.closefd = open_as_binary_stream(filename, "rb") self.lock = threading.Lock() From 6455ec014a301a6bfe3269c1e9338226d6d1270b Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 19 Mar 2025 10:01:19 +0100 Subject: [PATCH 8/8] Remove unused function --- bgzip_runner.py | 19 +++++++++++++++++++ src/isal/igzip_threaded.py | 12 ------------ src/isal/isa-l | 2 +- 3 files changed, 20 insertions(+), 13 deletions(-) create mode 100755 bgzip_runner.py diff --git a/bgzip_runner.py b/bgzip_runner.py new file mode 100755 index 00000000..17659a5b --- /dev/null +++ b/bgzip_runner.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 +import io +import sys + +from isal.igzip_threaded import _ThreadedGzipReader, _ThreadedBGzipReader + +def main(): + file = sys.argv[1] + with io.BufferedReader( + _ThreadedBGzipReader(file, threads=8) + ) as f: + while True: + block = f.read(128 * 1024) + if block == b"": + return + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/isal/igzip_threaded.py b/src/isal/igzip_threaded.py index d2bef8d3..ecab37cf 100644 --- a/src/isal/igzip_threaded.py +++ b/src/isal/igzip_threaded.py @@ -252,18 +252,6 @@ def _decompress(self, index: int): except queue.Full: pass - def _set_error_and_empty_queue(self, error, q): - with self.lock: - self.exception = error - # Abort everything and empty the queue - self.running = False - while True: - try: - _ = q.get(timeout=0.05) - q.task_done() - except queue.Empty: - return - def readinto(self, b): self._check_closed() result = self.buffer.readinto(b) diff --git a/src/isal/isa-l b/src/isal/isa-l index e3569fb7..c387163f 160000 --- a/src/isal/isa-l +++ b/src/isal/isa-l @@ -1 +1 @@ -Subproject commit e3569fb7224f071fe5ae510453f4451497385e44 +Subproject commit c387163fcbca62701d646149564c550c78a4f985