From d5d22be064f7e93a8423e452c0845ae735bf1c4e Mon Sep 17 00:00:00 2001 From: "Christopher J. Markiewicz" Date: Sat, 26 Oct 2019 09:20:17 -0400 Subject: [PATCH 1/2] ENH: Always fsync pickle files --- nipype/utils/filemanip.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/nipype/utils/filemanip.py b/nipype/utils/filemanip.py index d846ce4bca..057178f4e4 100644 --- a/nipype/utils/filemanip.py +++ b/nipype/utils/filemanip.py @@ -790,9 +790,9 @@ def read_stream(stream, logger=None, encoding=None): def savepkl(filename, record, versioning=False): - pklopen = gzip.open if filename.endswith('.pklz') else open with SoftFileLock('%s.lock' % filename): - with pklopen(filename, 'wb') as pkl_file: + with open(filename, 'wb') as fobj: + pkl_file = gzip.GzipFile(fileobj=fobj) if filename.endswith('.pklz') else fobj if versioning: from nipype import __version__ as version metadata = json.dumps({'version': version}) @@ -801,6 +801,9 @@ def savepkl(filename, record, versioning=False): pkl_file.write('\n'.encode('utf-8')) pickle.dump(record, pkl_file) + # Pickle files need to be available immediately, so force a sync + fobj.flush() + os.fsync(fobj.fileno()) rst_levels = ['=', '-', '~', '+'] From a07c138cff91b239bb292c729e9e07b0fa9fb4e2 Mon Sep 17 00:00:00 2001 From: "Christopher J. Markiewicz" Date: Mon, 28 Oct 2019 22:26:12 -0400 Subject: [PATCH 2/2] ENH: Add sync option to savepkl, sync when saving result file --- nipype/pipeline/engine/utils.py | 8 ++++---- nipype/utils/filemanip.py | 25 ++++++++++++------------- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/nipype/pipeline/engine/utils.py b/nipype/pipeline/engine/utils.py index a5245dda48..218c30850b 100644 --- a/nipype/pipeline/engine/utils.py +++ b/nipype/pipeline/engine/utils.py @@ -236,17 +236,17 @@ def save_resultfile(result, cwd, name, rebase=None): if result.outputs is None: logger.warning('Storing result file without outputs') - savepkl(resultsfile, result) + savepkl(resultsfile, result, sync=True) return try: output_names = result.outputs.copyable_trait_names() except AttributeError: logger.debug('Storing non-traited results, skipping rebase of paths') - savepkl(resultsfile, result) + savepkl(resultsfile, result, sync=True) return if not rebase: - savepkl(resultsfile, result) + savepkl(resultsfile, result, sync=True) return backup_traits = {} @@ -262,7 +262,7 @@ def save_resultfile(result, cwd, name, rebase=None): backup_traits[key] = old val = rebase_path_traits(result.outputs.trait(key), old, cwd) setattr(result.outputs, key, val) - savepkl(resultsfile, result) + savepkl(resultsfile, result, sync=True) finally: # Restore resolved paths from the outputs dict no matter what for key, val in list(backup_traits.items()): diff --git a/nipype/utils/filemanip.py b/nipype/utils/filemanip.py index 057178f4e4..ca82778e7d 100644 --- a/nipype/utils/filemanip.py +++ b/nipype/utils/filemanip.py @@ -789,19 +789,18 @@ def read_stream(stream, logger=None, encoding=None): return out.splitlines() -def savepkl(filename, record, versioning=False): - with SoftFileLock('%s.lock' % filename): - with open(filename, 'wb') as fobj: - pkl_file = gzip.GzipFile(fileobj=fobj) if filename.endswith('.pklz') else fobj - if versioning: - from nipype import __version__ as version - metadata = json.dumps({'version': version}) - - pkl_file.write(metadata.encode('utf-8')) - pkl_file.write('\n'.encode('utf-8')) - - pickle.dump(record, pkl_file) - # Pickle files need to be available immediately, so force a sync +def savepkl(filename, record, versioning=False, sync=False): + with open(filename, 'wb') as fobj: + pkl_file = gzip.GzipFile(fileobj=fobj) if filename.endswith('.pklz') else fobj + if versioning: + from nipype import __version__ as version + metadata = json.dumps({'version': version}) + + pkl_file.write(metadata.encode('utf-8')) + pkl_file.write('\n'.encode('utf-8')) + + pickle.dump(record, pkl_file) + if sync: fobj.flush() os.fsync(fobj.fileno())