From 651a042029ae1e67230dc8dd9c0f6d1451341f96 Mon Sep 17 00:00:00 2001 From: Satrajit Ghosh Date: Tue, 10 Sep 2019 14:17:55 -0400 Subject: [PATCH 1/4] enh: replace portalocker with filelock and use locking for reading/writing pickled files --- nipype/algorithms/misc.py | 13 +-- nipype/external/cloghandler.py | 23 ++---- nipype/external/portalocker.py | 145 --------------------------------- nipype/utils/config.py | 22 ++--- nipype/utils/filemanip.py | 85 +++++++++---------- requirements.txt | 1 + 6 files changed, 64 insertions(+), 225 deletions(-) delete mode 100644 nipype/external/portalocker.py diff --git a/nipype/algorithms/misc.py b/nipype/algorithms/misc.py index a2eaad3610..7e0305aa36 100644 --- a/nipype/algorithms/misc.py +++ b/nipype/algorithms/misc.py @@ -799,11 +799,11 @@ def _run_interface(self, runtime): '(http://pandas.pydata.org/) to run.'), e) try: - import lockfile as pl + from filelock import SoftFileLock self._have_lock = True except ImportError: from warnings import warn - warn(('Python module lockfile was not found: AddCSVRow will not be' + warn(('Python module filelock was not found: AddCSVRow will not be' ' thread-safe in multi-processor execution')) input_dict = {} @@ -822,7 +822,7 @@ def _run_interface(self, runtime): df = pd.DataFrame([input_dict]) if self._have_lock: - self._lock = pl.FileLock(self.inputs.in_file) + self._lock = SoftFileLock(self.inputs.in_file + '.lock') # Acquire lock self._lock.acquire() @@ -837,13 +837,6 @@ def _run_interface(self, runtime): if self._have_lock: self._lock.release() - # Using nipype.external.portalocker this might be something like: - # with pl.Lock(self.inputs.in_file, timeout=1) as fh: - # if op.exists(fh): - # formerdf = pd.read_csv(fh, index_col=0) - # df = pd.concat([formerdf, df], ignore_index=True) - # df.to_csv(fh) - return runtime def _list_outputs(self): diff --git a/nipype/external/cloghandler.py b/nipype/external/cloghandler.py index 5fda934c84..d99435b94f 100644 --- a/nipype/external/cloghandler.py +++ b/nipype/external/cloghandler.py @@ -36,10 +36,6 @@ testing, performance was more than adequate, but if you need a high-volume or low-latency solution, I suggest you look elsewhere. -This module currently only support the 'nt' and 'posix' platforms due to the -usage of the portalocker module. I do not have access to any other platforms -for testing, patches are welcome. - See the README file for an example usage of this module. """ @@ -63,13 +59,7 @@ except ImportError: codecs = None -# Question/TODO: Should we have a fallback mode if we can't load portalocker / -# we should still be better off than with the standard RotattingFileHandler -# class, right? We do some rename checking... that should prevent some file -# clobbering that the builtin class allows. - -# sibling module than handles all the ugly platform-specific details of file locking -from .portalocker import lock, unlock, LOCK_EX, LOCK_NB, LockException +from filelock import SoftFileLock # A client can set this to true to automatically convert relative paths to # absolute paths (which will also hide the absolute path warnings) @@ -168,11 +158,8 @@ def __init__(self, self.maxBytes = maxBytes self.backupCount = backupCount # Prevent multiple extensions on the lock file (Only handles the normal "*.log" case.) - if filename.endswith(".log"): - lock_file = filename[:-4] - else: - lock_file = filename - self.stream_lock = open(lock_file + ".lock", "w") + self.lock_file = filename + '.lock' + self.stream_lock = SoftFileLock(self.lock_file) # For debug mode, swap out the "_degrade()" method with a more a verbose one. if debug: @@ -189,7 +176,7 @@ def acquire(self): in 'degraded' mode. """ # handle thread lock Handler.acquire(self) - lock(self.stream_lock, LOCK_EX) + self.stream_lock.acquire() if self.stream.closed: self._openFile(self.mode) @@ -206,7 +193,7 @@ def release(self): self.stream.close() finally: try: - unlock(self.stream_lock) + self.stream_lock.release() finally: # release thread lock Handler.release(self) diff --git a/nipype/external/portalocker.py b/nipype/external/portalocker.py deleted file mode 100644 index 1da24d894c..0000000000 --- a/nipype/external/portalocker.py +++ /dev/null @@ -1,145 +0,0 @@ -# -*- coding: utf-8 -*- -# portalocker.py - Cross-platform (posix/nt) API for flock-style file locking. -# Requires python 1.5.2 or better. -'''Cross-platform (posix/nt) API for flock-style file locking. - -Synopsis: - - import portalocker - file = open('somefile', 'r+') - portalocker.lock(file, portalocker.LOCK_EX) - file.seek(12) - file.write('foo') - file.close() - -If you know what you're doing, you may choose to - - portalocker.unlock(file) - -before closing the file, but why? - -Methods: - - lock( file, flags ) - unlock( file ) - -Constants: - - LOCK_EX - LOCK_SH - LOCK_NB - -Exceptions: - - LockException - -Notes: - -For the 'nt' platform, this module requires the Python Extensions for Windows. -Be aware that this may not work as expected on Windows 95/98/ME. - -History: - -I learned the win32 technique for locking files from sample code -provided by John Nielsen in the documentation -that accompanies the win32 modules. - -Author: Jonathan Feinberg , - Lowell Alleman -Version: $Id: portalocker.py 5474 2008-05-16 20:53:50Z lowell $ - -''' -from __future__ import (print_function, division, unicode_literals, - absolute_import) -from builtins import open - -__all__ = [ - 'lock', - 'unlock', - 'LOCK_EX', - 'LOCK_SH', - 'LOCK_NB', - 'LockException', -] - -import os - - -class LockException(Exception): - # Error codes: - LOCK_FAILED = 1 - - -if os.name == 'nt': - import win32con - import win32file - import pywintypes - LOCK_EX = win32con.LOCKFILE_EXCLUSIVE_LOCK - LOCK_SH = 0 # the default - LOCK_NB = win32con.LOCKFILE_FAIL_IMMEDIATELY - # is there any reason not to reuse the following structure? - __overlapped = pywintypes.OVERLAPPED() -elif os.name == 'posix': - import fcntl - LOCK_EX = fcntl.LOCK_EX - LOCK_SH = fcntl.LOCK_SH - LOCK_NB = fcntl.LOCK_NB -else: - raise RuntimeError('PortaLocker only defined for nt and posix platforms') - -if os.name == 'nt': - - def lock(file, flags): - hfile = win32file._get_osfhandle(file.fileno()) - try: - win32file.LockFileEx(hfile, flags, 0, -0x10000, __overlapped) - except pywintypes.error as exc_value: - # error: (33, 'LockFileEx', 'The process cannot access the file - # because another process has locked a portion of the file.') - if exc_value[0] == 33: - raise LockException(LockException.LOCK_FAILED, exc_value[2]) - else: - # Q: Are there exceptions/codes we should be dealing with here? - raise - - def unlock(file): - hfile = win32file._get_osfhandle(file.fileno()) - try: - win32file.UnlockFileEx(hfile, 0, -0x10000, __overlapped) - except pywintypes.error as exc_value: - if exc_value[0] == 158: - # error: (158, 'UnlockFileEx', 'The segment is already - # unlocked.') To match the 'posix' implementation, silently - # ignore this error - pass - else: - # Q: Are there exceptions/codes we should be dealing with here? - raise - -elif os.name == 'posix': - - def lock(file, flags): - try: - fcntl.flock(file.fileno(), flags) - except IOError as exc_value: - # The exception code varies on different systems so we'll catch - # every IO error - raise LockException(*exc_value) - - def unlock(file): - fcntl.flock(file.fileno(), fcntl.LOCK_UN) - - -if __name__ == '__main__': - from time import time, strftime, localtime - import sys - from . import portalocker - - log = open('log.txt', 'a+') - portalocker.lock(log, portalocker.LOCK_EX) - timestamp = strftime('%m/%d/%Y %H:%M:%S\n', localtime(time())) - log.write(timestamp) - - print('Wrote lines. Hit enter to release lock.') - dummy = sys.stdin.readline() - log.close() diff --git a/nipype/utils/config.py b/nipype/utils/config.py index 79c0bf6b51..2fd56f11a7 100644 --- a/nipype/utils/config.py +++ b/nipype/utils/config.py @@ -25,7 +25,7 @@ from future import standard_library from .misc import str2bool -from ..external import portalocker +from filelock import SoftFileLock standard_library.install_aliases() @@ -209,9 +209,9 @@ def get_data(self, key): """Read options file""" if not os.path.exists(self.data_file): return None - with open(self.data_file, 'rt') as file: - portalocker.lock(file, portalocker.LOCK_EX) - datadict = load(file) + with SoftFileLock(self.data_file + '.lock'): + with open(self.data_file, 'rt') as file: + datadict = load(file) if key in datadict: return datadict[key] return None @@ -220,17 +220,17 @@ def save_data(self, key, value): """Store config flie""" datadict = {} if os.path.exists(self.data_file): - with open(self.data_file, 'rt') as file: - portalocker.lock(file, portalocker.LOCK_EX) - datadict = load(file) + with SoftFileLock(self.data_file + '.lock'): + with open(self.data_file, 'rt') as file: + datadict = load(file) else: dirname = os.path.dirname(self.data_file) if not os.path.exists(dirname): mkdir_p(dirname) - with open(self.data_file, 'wt') as file: - portalocker.lock(file, portalocker.LOCK_EX) - datadict[key] = value - dump(datadict, file) + with SoftFileLock(self.data_file + '.lock'): + with open(self.data_file, 'wt') as file: + datadict[key] = value + dump(datadict, file) def update_config(self, config_dict): """Extend internal dictionary with config_dict""" diff --git a/nipype/utils/filemanip.py b/nipype/utils/filemanip.py index 6e6fb83623..51373ebc0e 100644 --- a/nipype/utils/filemanip.py +++ b/nipype/utils/filemanip.py @@ -21,6 +21,7 @@ import contextlib import posixpath import simplejson as json +from filelock import SoftFileLock from builtins import str, bytes, open @@ -684,36 +685,37 @@ def loadpkl(infile): unpkl = None with indirectory(infile.parent): - pkl_file = pklopen(infile.name, 'rb') - - try: # Look if pkl file contains version file - pkl_metadata_line = pkl_file.readline() - pkl_metadata = json.loads(pkl_metadata_line) - except (UnicodeDecodeError, json.JSONDecodeError): - # Could not get version info - pkl_file.seek(0) - - try: - unpkl = pickle.load(pkl_file) - except UnicodeDecodeError: - # Was this pickle created with Python 2.x? - unpkl = pickle.load(pkl_file, fix_imports=True, encoding='utf-8') - fmlogger.info('Successfully loaded pkl in compatibility mode.') - # Unpickling problems - except Exception as e: - if pkl_metadata and 'version' in pkl_metadata: - from nipype import __version__ as version - if pkl_metadata['version'] != version: - fmlogger.error("""\ -Attempted to open a results file generated by Nipype version %s, \ -with an incompatible Nipype version (%s)""", pkl_metadata['version'], version) - raise e - fmlogger.error("""\ -No metadata was found in the pkl file. Make sure you are currently using \ -the same Nipype version from the generated pkl.""") - raise e - finally: - pkl_file.close() + with SoftFileLock(infile.name + '.lock'): + pkl_file = pklopen(infile.name, 'rb') + + try: # Look if pkl file contains version file + pkl_metadata_line = pkl_file.readline() + pkl_metadata = json.loads(pkl_metadata_line) + except (UnicodeDecodeError, json.JSONDecodeError): + # Could not get version info + pkl_file.seek(0) + + try: + unpkl = pickle.load(pkl_file) + except UnicodeDecodeError: + # Was this pickle created with Python 2.x? + unpkl = pickle.load(pkl_file, fix_imports=True, encoding='utf-8') + fmlogger.info('Successfully loaded pkl in compatibility mode.') + # Unpickling problems + except Exception as e: + if pkl_metadata and 'version' in pkl_metadata: + from nipype import __version__ as version + if pkl_metadata['version'] != version: + fmlogger.error("""\ + Attempted to open a results file generated by Nipype version %s, \ + with an incompatible Nipype version (%s)""", pkl_metadata['version'], version) + raise e + fmlogger.error("""\ + No metadata was found in the pkl file. Make sure you are currently using \ + the same Nipype version from the generated pkl.""") + raise e + finally: + pkl_file.close() if unpkl is None: raise ValueError('Loading %s resulted in None.' % infile) @@ -754,20 +756,21 @@ def read_stream(stream, logger=None, encoding=None): def savepkl(filename, record, versioning=False): - if filename.endswith('pklz'): - pkl_file = gzip.open(filename, 'wb') - else: - pkl_file = open(filename, 'wb') + with SoftFileLock(filename + '.lock'): + if filename.endswith('pklz'): + pkl_file = gzip.open(filename, 'wb') + else: + pkl_file = open(filename, 'wb') - if versioning: - from nipype import __version__ as version - metadata = json.dumps({'version': version}) + if versioning: + from nipype import __version__ as version + metadata = json.dumps({'version': version}) - pkl_file.write(metadata.encode('utf-8')) - pkl_file.write('\n'.encode('utf-8')) + pkl_file.write(metadata.encode('utf-8')) + pkl_file.write('\n'.encode('utf-8')) - pickle.dump(record, pkl_file) - pkl_file.close() + pickle.dump(record, pkl_file) + pkl_file.close() rst_levels = ['=', '-', '~', '+'] diff --git a/requirements.txt b/requirements.txt index 99b97a19a6..44226da48b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,3 +16,4 @@ python-dateutil>=2.2 scipy>=0.14 simplejson>=3.8.0 traits>=4.6 +filelock>= 3.0.0 From e1c792bd214d0f1d74cec23cba9fa1bd4f3187b7 Mon Sep 17 00:00:00 2001 From: Satrajit Ghosh Date: Tue, 10 Sep 2019 15:14:38 -0400 Subject: [PATCH 2/4] fix: add filelock to packages --- nipype/info.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nipype/info.py b/nipype/info.py index 951c93c528..83ce1ca43d 100644 --- a/nipype/info.py +++ b/nipype/info.py @@ -164,6 +164,7 @@ def get_nipype_gitversion(): 'scipy>=%s,<%s ; python_version <= "3.4"' % (SCIPY_MIN_VERSION, SCIPY_MAX_VERSION_34), 'simplejson>=%s' % SIMPLEJSON_MIN_VERSION, 'traits>=%s,!=5.0' % TRAITS_MIN_VERSION, + 'filelock>=3.0.0' ] # neurdflib has to come after prov From 26b0eb63cad767e7dc4d831d37e71fcc9ced7450 Mon Sep 17 00:00:00 2001 From: Satrajit Ghosh Date: Tue, 10 Sep 2019 19:26:23 -0400 Subject: [PATCH 3/4] Apply suggestions from code review Co-Authored-By: Oscar Esteban --- nipype/algorithms/misc.py | 2 +- nipype/external/cloghandler.py | 2 +- nipype/utils/config.py | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/nipype/algorithms/misc.py b/nipype/algorithms/misc.py index 7e0305aa36..4fcd7c8d85 100644 --- a/nipype/algorithms/misc.py +++ b/nipype/algorithms/misc.py @@ -822,7 +822,7 @@ def _run_interface(self, runtime): df = pd.DataFrame([input_dict]) if self._have_lock: - self._lock = SoftFileLock(self.inputs.in_file + '.lock') + self._lock = SoftFileLock('%s.lock' % self.inputs.in_file) # Acquire lock self._lock.acquire() diff --git a/nipype/external/cloghandler.py b/nipype/external/cloghandler.py index d99435b94f..05e28968dd 100644 --- a/nipype/external/cloghandler.py +++ b/nipype/external/cloghandler.py @@ -158,7 +158,7 @@ def __init__(self, self.maxBytes = maxBytes self.backupCount = backupCount # Prevent multiple extensions on the lock file (Only handles the normal "*.log" case.) - self.lock_file = filename + '.lock' + self.lock_file = '%s.lock' % filename self.stream_lock = SoftFileLock(self.lock_file) # For debug mode, swap out the "_degrade()" method with a more a verbose one. diff --git a/nipype/utils/config.py b/nipype/utils/config.py index 2fd56f11a7..d6d6d0879d 100644 --- a/nipype/utils/config.py +++ b/nipype/utils/config.py @@ -209,7 +209,7 @@ def get_data(self, key): """Read options file""" if not os.path.exists(self.data_file): return None - with SoftFileLock(self.data_file + '.lock'): + with SoftFileLock('%s.lock' % self.data_file): with open(self.data_file, 'rt') as file: datadict = load(file) if key in datadict: @@ -220,14 +220,14 @@ def save_data(self, key, value): """Store config flie""" datadict = {} if os.path.exists(self.data_file): - with SoftFileLock(self.data_file + '.lock'): + with SoftFileLock('%s.lock' % self.data_file): with open(self.data_file, 'rt') as file: datadict = load(file) else: dirname = os.path.dirname(self.data_file) if not os.path.exists(dirname): mkdir_p(dirname) - with SoftFileLock(self.data_file + '.lock'): + with SoftFileLock('%s.lock' % self.data_file): with open(self.data_file, 'wt') as file: datadict[key] = value dump(datadict, file) From fdf12b759cebb46eccc5479fe10d648466bccd5f Mon Sep 17 00:00:00 2001 From: Satrajit Ghosh Date: Tue, 10 Sep 2019 20:48:35 -0400 Subject: [PATCH 4/4] fix: address review comments --- nipype/utils/filemanip.py | 80 ++++++++++++++++++--------------------- 1 file changed, 36 insertions(+), 44 deletions(-) diff --git a/nipype/utils/filemanip.py b/nipype/utils/filemanip.py index 51373ebc0e..8fcfdc8beb 100644 --- a/nipype/utils/filemanip.py +++ b/nipype/utils/filemanip.py @@ -685,37 +685,33 @@ def loadpkl(infile): unpkl = None with indirectory(infile.parent): - with SoftFileLock(infile.name + '.lock'): - pkl_file = pklopen(infile.name, 'rb') - - try: # Look if pkl file contains version file - pkl_metadata_line = pkl_file.readline() - pkl_metadata = json.loads(pkl_metadata_line) - except (UnicodeDecodeError, json.JSONDecodeError): - # Could not get version info - pkl_file.seek(0) - - try: - unpkl = pickle.load(pkl_file) - except UnicodeDecodeError: - # Was this pickle created with Python 2.x? - unpkl = pickle.load(pkl_file, fix_imports=True, encoding='utf-8') - fmlogger.info('Successfully loaded pkl in compatibility mode.') - # Unpickling problems - except Exception as e: - if pkl_metadata and 'version' in pkl_metadata: - from nipype import __version__ as version - if pkl_metadata['version'] != version: - fmlogger.error("""\ - Attempted to open a results file generated by Nipype version %s, \ - with an incompatible Nipype version (%s)""", pkl_metadata['version'], version) - raise e - fmlogger.error("""\ - No metadata was found in the pkl file. Make sure you are currently using \ - the same Nipype version from the generated pkl.""") - raise e - finally: - pkl_file.close() + with SoftFileLock('%s.lock' % infile.name): + with pklopen(infile.name, 'rb') as pkl_file: + try: # Look if pkl file contains version file + pkl_metadata_line = pkl_file.readline() + pkl_metadata = json.loads(pkl_metadata_line) + except (UnicodeDecodeError, json.JSONDecodeError): + # Could not get version info + pkl_file.seek(0) + try: + unpkl = pickle.load(pkl_file) + except UnicodeDecodeError: + # Was this pickle created with Python 2.x? + unpkl = pickle.load(pkl_file, fix_imports=True, encoding='utf-8') + fmlogger.info('Successfully loaded pkl in compatibility mode.') + # Unpickling problems + except Exception as e: + if pkl_metadata and 'version' in pkl_metadata: + from nipype import __version__ as version + if pkl_metadata['version'] != version: + fmlogger.error("""\ +Attempted to open a results file generated by Nipype version %s, \ +with an incompatible Nipype version (%s)""", pkl_metadata['version'], version) + raise e + fmlogger.error("""\ +No metadata was found in the pkl file. Make sure you are currently using \ +the same Nipype version from the generated pkl.""") + raise e if unpkl is None: raise ValueError('Loading %s resulted in None.' % infile) @@ -756,21 +752,17 @@ def read_stream(stream, logger=None, encoding=None): def savepkl(filename, record, versioning=False): - with SoftFileLock(filename + '.lock'): - if filename.endswith('pklz'): - pkl_file = gzip.open(filename, 'wb') - else: - pkl_file = open(filename, 'wb') - - if versioning: - from nipype import __version__ as version - metadata = json.dumps({'version': version}) + pklopen = gzip.open if filename.endswith('.pklz') else open + with SoftFileLock('%s.lock' % filename): + with pklopen(filename, 'wb') as pkl_file: + if versioning: + from nipype import __version__ as version + metadata = json.dumps({'version': version}) - pkl_file.write(metadata.encode('utf-8')) - pkl_file.write('\n'.encode('utf-8')) + pkl_file.write(metadata.encode('utf-8')) + pkl_file.write('\n'.encode('utf-8')) - pickle.dump(record, pkl_file) - pkl_file.close() + pickle.dump(record, pkl_file) rst_levels = ['=', '-', '~', '+']