From f7a90fe213dce4dfe4b5c93d8b5a736582f89dcf Mon Sep 17 00:00:00 2001 From: Chris Markiewicz Date: Thu, 9 Mar 2023 20:52:21 -0500 Subject: [PATCH] RF: Pull compression detection logic into a central private module --- nibabel/_compression.py | 49 ++++++++++++++++++++++++++++++++++++++ nibabel/filebasedimages.py | 3 ++- nibabel/openers.py | 16 +------------ nibabel/volumeutils.py | 17 +------------ 4 files changed, 53 insertions(+), 32 deletions(-) create mode 100644 nibabel/_compression.py diff --git a/nibabel/_compression.py b/nibabel/_compression.py new file mode 100644 index 000000000..bf13895c8 --- /dev/null +++ b/nibabel/_compression.py @@ -0,0 +1,49 @@ +# emacs: -*- mode: python-mode; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ## +# +# See COPYING file distributed along with the NiBabel package for the +# copyright and license terms. +# +### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ## +"""Constants and types for dealing transparently with compression""" +from __future__ import annotations + +import bz2 +import gzip +import io +import typing as ty + +from .optpkg import optional_package + +if ty.TYPE_CHECKING: # pragma: no cover + import indexed_gzip # type: ignore + import pyzstd + + HAVE_INDEXED_GZIP = True + HAVE_ZSTD = True +else: + indexed_gzip, HAVE_INDEXED_GZIP, _ = optional_package('indexed_gzip') + pyzstd, HAVE_ZSTD, _ = optional_package('pyzstd') + + +# Collections of types for isinstance or exception matching +COMPRESSED_FILE_LIKES: tuple[type[io.IOBase], ...] = ( + bz2.BZ2File, + gzip.GzipFile, +) +COMPRESSION_ERRORS: tuple[type[BaseException], ...] = ( + OSError, # BZ2File + gzip.BadGzipFile, +) + +if HAVE_INDEXED_GZIP: + COMPRESSED_FILE_LIKES += (indexed_gzip.IndexedGzipFile,) + COMPRESSION_ERRORS += (indexed_gzip.ZranError,) + from indexed_gzip import IndexedGzipFile # type: ignore +else: + IndexedGzipFile = gzip.GzipFile + +if HAVE_ZSTD: + COMPRESSED_FILE_LIKES += (pyzstd.ZstdFile,) + COMPRESSION_ERRORS += (pyzstd.ZstdError,) diff --git a/nibabel/filebasedimages.py b/nibabel/filebasedimages.py index 685b11b79..3d1a95c1a 100644 --- a/nibabel/filebasedimages.py +++ b/nibabel/filebasedimages.py @@ -15,6 +15,7 @@ from typing import Type from urllib import request +from ._compression import COMPRESSION_ERRORS from .fileholders import FileHolder, FileMap from .filename_parser import TypesFilenamesError, _stringify_path, splitext_addext, types_filenames from .openers import ImageOpener @@ -421,7 +422,7 @@ def _sniff_meta_for( try: with ImageOpener(meta_fname, 'rb') as fobj: binaryblock = fobj.read(sniff_nbytes) - except (OSError, EOFError): + except COMPRESSION_ERRORS + (OSError, EOFError): return None return (binaryblock, meta_fname) diff --git a/nibabel/openers.py b/nibabel/openers.py index 3e3b2fb29..90c7774d1 100644 --- a/nibabel/openers.py +++ b/nibabel/openers.py @@ -15,12 +15,11 @@ from bz2 import BZ2File from os.path import splitext -from nibabel.optpkg import optional_package +from ._compression import HAVE_INDEXED_GZIP, IndexedGzipFile, pyzstd if ty.TYPE_CHECKING: # pragma: no cover from types import TracebackType - import pyzstd from _typeshed import WriteableBuffer ModeRT = ty.Literal['r', 'rt'] @@ -32,8 +31,6 @@ Mode = ty.Union[ModeR, ModeW] OpenerDef = tuple[ty.Callable[..., io.IOBase], tuple[str, ...]] -else: - pyzstd = optional_package('pyzstd')[0] @ty.runtime_checkable @@ -45,17 +42,6 @@ def write(self, b: bytes, /) -> int | None: ... # pragma: no cover -try: - from indexed_gzip import IndexedGzipFile # type: ignore - - HAVE_INDEXED_GZIP = True -except ImportError: - # nibabel.openers.IndexedGzipFile is imported by nibabel.volumeutils - # to detect compressed file types, so we give a fallback value here. - IndexedGzipFile = gzip.GzipFile - HAVE_INDEXED_GZIP = False - - class DeterministicGzipFile(gzip.GzipFile): """Deterministic variant of GzipFile diff --git a/nibabel/volumeutils.py b/nibabel/volumeutils.py index d61a41e67..90e5e5ff3 100644 --- a/nibabel/volumeutils.py +++ b/nibabel/volumeutils.py @@ -9,36 +9,28 @@ """Utility functions for analyze-like formats""" from __future__ import annotations -import gzip import io import sys import typing as ty import warnings -from bz2 import BZ2File from functools import reduce from operator import getitem, mul from os.path import exists, splitext import numpy as np +from ._compression import COMPRESSED_FILE_LIKES from .casting import OK_FLOATS, shared_range from .externals.oset import OrderedSet -from .openers import IndexedGzipFile -from .optpkg import optional_package if ty.TYPE_CHECKING: # pragma: no cover import numpy.typing as npt - import pyzstd - - HAVE_ZSTD = True Scalar = np.number | float K = ty.TypeVar('K') V = ty.TypeVar('V') DT = ty.TypeVar('DT', bound=np.generic) -else: - pyzstd, HAVE_ZSTD, _ = optional_package('pyzstd') sys_is_le = sys.byteorder == 'little' native_code = sys_is_le and '<' or '>' @@ -55,13 +47,6 @@ #: default compression level when writing gz and bz2 files default_compresslevel = 1 -#: file-like classes known to hold compressed data -COMPRESSED_FILE_LIKES: tuple[type[io.IOBase], ...] = (gzip.GzipFile, BZ2File, IndexedGzipFile) - -# Enable .zst support if pyzstd installed. -if HAVE_ZSTD: - COMPRESSED_FILE_LIKES = (*COMPRESSED_FILE_LIKES, pyzstd.ZstdFile) - class Recoder: """class to return canonical code(s) from code or aliases