Skip to content

Simplify code in gzip_ng.py #50

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ version 0.5.0-dev
+ Wheels are now build for MacOS arm64 architectures.
+ Fix a bug where READ and WRITE in zlib_ng.gzip_ng were inconsistent with the
values in gzip on Python 3.13
+ Small simplifications to the ``gzip_ng.compress`` and ``gzip_ng.decompress``
functions, which should lead to less overhead.

version 0.4.3
-----------------
Expand Down
48 changes: 11 additions & 37 deletions src/zlib_ng/gzip_ng.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import gzip
import io
import os
import shutil
import struct
import sys
import time
Expand Down Expand Up @@ -180,50 +181,27 @@ def write(self, data):
_GzipNGReader = _GzipReader


def _create_simple_gzip_header(compresslevel: int,
mtime=None) -> bytes:
"""
Write a simple gzip header with no extra fields.
:param compresslevel: Compresslevel used to determine the xfl bytes.
:param mtime: The mtime (must support conversion to a 32-bit integer).
:return: A bytes object representing the gzip header.
"""
if mtime is None:
mtime = time.time()
if compresslevel == _COMPRESS_LEVEL_BEST:
xfl = 2
elif compresslevel == _COMPRESS_LEVEL_FAST:
xfl = 4
else:
xfl = 0
# Pack ID1 and ID2 magic bytes, method (8=deflate), header flags (no extra
# fields added to header), mtime, xfl and os (255 for unknown OS).
return struct.pack("<BBBBLBB", 0x1f, 0x8b, 8, 0, int(mtime), xfl, 255)


def compress(data, compresslevel=_COMPRESS_LEVEL_BEST, *, mtime=None):
"""Compress data in one shot and return the compressed string.
compresslevel sets the compression level in range of 0-9.
mtime can be used to set the modification time. The modification time is
set to the current time by default.
"""
if mtime == 0:
# Use zlib as it creates the header with 0 mtime by default.
# This is faster and with less overhead.
return zlib_ng.compress(data, level=compresslevel, wbits=31)
header = _create_simple_gzip_header(compresslevel, mtime)
trailer = struct.pack("<LL", zlib_ng.crc32(data), (len(data) & 0xffffffff))
# Wbits=-15 creates a raw deflate block.
return (header + zlib_ng.compress(data, level=compresslevel, wbits=-15) +
trailer)
# Wbits=31 automatically includes a gzip header and trailer.
gzip_data = zlib_ng.compress(data, level=compresslevel, wbits=31)
if mtime is None:
mtime = time.time()
# Reuse gzip header created by zlib_ng, replace mtime and OS byte for
# consistency.
header = struct.pack("<4sLBB", gzip_data, int(mtime), gzip_data[8], 255)
return header + gzip_data[10:]


def decompress(data):
"""Decompress a gzip compressed string in one shot.
Return the decompressed string.
"""
fp = io.BytesIO(data)
reader = _GzipReader(fp, max(len(data), 16))
reader = _GzipReader(data)
return reader.readall()


Expand Down Expand Up @@ -325,11 +303,7 @@ def main():
global READ_BUFFER_SIZE
READ_BUFFER_SIZE = args.buffer_size
try:
while True:
block = in_file.read(args.buffer_size)
if block == b"":
break
out_file.write(block)
shutil.copyfileobj(in_file, out_file, args.buffer_size)
finally:
if in_file is not sys.stdin.buffer:
in_file.close()
Expand Down
Loading