From c510d5e09618697301ff43d254ce2b6190a77aaf Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Mon, 9 Sep 2019 23:52:19 -0700 Subject: [PATCH 1/6] FIX: Disallow returning ``None`` in ``pipeline.utils.load_resultfile`` Prevents #3009 and #3014 from happening - although this might not solve those issues, this patch will help find their origin by making ``load_resultfile`` more strict (and letting it raise exceptions). The try .. except structure is moved to the only place is was being used within the Node code. --- nipype/pipeline/engine/nodes.py | 66 ++++++++++++++++++--------------- nipype/pipeline/engine/utils.py | 30 ++++----------- 2 files changed, 44 insertions(+), 52 deletions(-) diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index 2c441a5c57..57c566f890 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -195,7 +195,7 @@ def interface(self): def result(self): """Get result from result file (do not hold it in memory)""" return _load_resultfile( - op.join(self.output_dir(), 'result_%s.pklz' % self.name))[0] + op.join(self.output_dir(), 'result_%s.pklz' % self.name)) @property def inputs(self): @@ -518,7 +518,7 @@ def _get_inputs(self): logger.debug('input: %s', key) results_file = info[0] logger.debug('results file: %s', results_file) - outputs = _load_resultfile(results_file)[0].outputs + outputs = _load_resultfile(results_file).outputs if outputs is None: raise RuntimeError("""\ Error populating the input "%s" of node "%s": the results file of the source node \ @@ -565,34 +565,42 @@ def _run_interface(self, execute=True, updatehash=False): def _load_results(self): cwd = self.output_dir() - result, aggregate, attribute_error = _load_resultfile( - op.join(cwd, 'result_%s.pklz' % self.name)) + + try: + result = _load_resultfile( + op.join(cwd, 'result_%s.pklz' % self.name)) + except (traits.TraitError, EOFError): + logger.debug( + 'Error populating inputs/outputs, (re)aggregating results...') + except (AttributeError, ImportError) as err: + logger.debug('attribute error: %s probably using ' + 'different trait pickled file', str(err)) + old_inputs = loadpkl(op.join(cwd, '_inputs.pklz')) + self.inputs.trait_set(**old_inputs) + else: + return result + # try aggregating first - if aggregate: - logger.debug('aggregating results') - if attribute_error: - old_inputs = loadpkl(op.join(cwd, '_inputs.pklz')) - self.inputs.trait_set(**old_inputs) - if not isinstance(self, MapNode): - self._copyfiles_to_wd(linksonly=True) - aggouts = self._interface.aggregate_outputs( - needed_outputs=self.needed_outputs) - runtime = Bunch( - cwd=cwd, - returncode=0, - environ=dict(os.environ), - hostname=socket.gethostname()) - result = InterfaceResult( - interface=self._interface.__class__, - runtime=runtime, - inputs=self._interface.inputs.get_traitsfree(), - outputs=aggouts) - _save_resultfile( - result, cwd, self.name, - rebase=str2bool(self.config['execution']['use_relative_paths'])) - else: - logger.debug('aggregating mapnode results') - result = self._run_interface() + if not isinstance(self, MapNode): + self._copyfiles_to_wd(linksonly=True) + aggouts = self._interface.aggregate_outputs( + needed_outputs=self.needed_outputs) + runtime = Bunch( + cwd=cwd, + returncode=0, + environ=dict(os.environ), + hostname=socket.gethostname()) + result = InterfaceResult( + interface=self._interface.__class__, + runtime=runtime, + inputs=self._interface.inputs.get_traitsfree(), + outputs=aggouts) + _save_resultfile( + result, cwd, self.name, + rebase=str2bool(self.config['execution']['use_relative_paths'])) + else: + logger.debug('aggregating mapnode results') + result = self._run_interface() return result def _run_command(self, execute, copyfiles=True): diff --git a/nipype/pipeline/engine/utils.py b/nipype/pipeline/engine/utils.py index 65170f14c9..8c1595c64d 100644 --- a/nipype/pipeline/engine/utils.py +++ b/nipype/pipeline/engine/utils.py @@ -38,11 +38,12 @@ write_rst_header, write_rst_dict, write_rst_list, + FileNotFoundError, ) from ...utils.misc import str2bool from ...utils.functions import create_function_from_source from ...interfaces.base.traits_extension import ( - rebase_path_traits, resolve_path_traits, OutputMultiPath, isdefined, Undefined, traits) + rebase_path_traits, resolve_path_traits, OutputMultiPath, isdefined, Undefined) from ...interfaces.base.support import Bunch, InterfaceResult from ...interfaces.base import CommandLine from ...interfaces.utility import IdentityInterface @@ -281,38 +282,22 @@ def load_resultfile(results_file, resolve=True): Returns ------- result : InterfaceResult structure - aggregate : boolean indicating whether node should aggregate_outputs - attribute error : boolean indicating whether there was some mismatch in - versions of traits used to store result and hence node needs to - rerun """ results_file = Path(results_file) - aggregate = True - result = None - attribute_error = False if not results_file.exists(): - return result, aggregate, attribute_error + raise FileNotFoundError(results_file) with indirectory(str(results_file.parent)): - try: - result = loadpkl(results_file) - except (traits.TraitError, EOFError): - logger.debug( - 'some file does not exist. hence trait cannot be set') - except (AttributeError, ImportError) as err: - attribute_error = True - logger.debug('attribute error: %s probably using ' - 'different trait pickled file', str(err)) - else: - aggregate = False + result = loadpkl(results_file) if resolve and result.outputs: try: outputs = result.outputs.get() except TypeError: # This is a Bunch - return result, aggregate, attribute_error + logger.debug('Outputs object of loaded result %s is a Bunch.', results_file) + return result logger.debug('Resolving paths in outputs loaded from results file.') for trait_name, old in list(outputs.items()): @@ -323,8 +308,7 @@ def load_resultfile(results_file, resolve=True): value = resolve_path_traits(result.outputs.trait(trait_name), old, results_file.parent) setattr(result.outputs, trait_name, value) - - return result, aggregate, attribute_error + return result def strip_temp(files, wd): From 63d5b68b2f8bbeaaed2356394bf42ec863c89d9c Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Tue, 10 Sep 2019 00:38:01 -0700 Subject: [PATCH 2/6] fix: revise ``report_crash`` to get along with the changes --- nipype/pipeline/plugins/tools.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/nipype/pipeline/plugins/tools.py b/nipype/pipeline/plugins/tools.py index 54fffd2398..3ddc37387c 100644 --- a/nipype/pipeline/plugins/tools.py +++ b/nipype/pipeline/plugins/tools.py @@ -16,7 +16,7 @@ from traceback import format_exception from ... import logging -from ...utils.filemanip import savepkl, crash2txt, makedirs +from ...utils.filemanip import savepkl, crash2txt, makedirs, FileNotFoundError logger = logging.getLogger('nipype.workflow') @@ -26,17 +26,29 @@ def report_crash(node, traceback=None, hostname=None): """ name = node._id host = None - if node.result and getattr(node.result, 'runtime'): - if isinstance(node.result.runtime, list): - host = node.result.runtime[0].hostname + traceback = traceback or format_exception(*sys.exc_info()) + + try: + result = node.result + except FileNotFoundError: + traceback += """ + +When creating this crashfile, the results file corresponding +to the node could not be found.""".splitlines(keepends=True) + except Exception as exc: + traceback += """ + +During the creation of this crashfile triggered by the above exception, +another exception occurred:\n\n{}.""".format(exc).splitlines(keepends=True) + else: + if isinstance(result.runtime, list): + host = result.runtime[0].hostname else: - host = node.result.runtime.hostname + host = result.runtime.hostname # Try everything to fill in the host host = host or hostname or gethostname() logger.error('Node %s failed to run on host %s.', name, host) - if not traceback: - traceback = format_exception(*sys.exc_info()) timeofcrash = strftime('%Y%m%d-%H%M%S') try: login_name = getpass.getuser() From be6024301c85f76c9054e8dc0da9e1c5401df032 Mon Sep 17 00:00:00 2001 From: oesteban Date: Tue, 10 Sep 2019 07:43:17 -0700 Subject: [PATCH 3/6] tst: adding first test --- nipype/pipeline/engine/tests/test_utils.py | 16 ++++++- nipype/pipeline/engine/utils.py | 53 ++++++++++++---------- 2 files changed, 43 insertions(+), 26 deletions(-) diff --git a/nipype/pipeline/engine/tests/test_utils.py b/nipype/pipeline/engine/tests/test_utils.py index c462ea1533..eb4415f826 100644 --- a/nipype/pipeline/engine/tests/test_utils.py +++ b/nipype/pipeline/engine/tests/test_utils.py @@ -16,7 +16,7 @@ from ....interfaces import base as nib from ....interfaces import utility as niu from .... import config -from ..utils import clean_working_directory, write_workflow_prov +from ..utils import clean_working_directory, write_workflow_prov, load_resultfile class InputSpec(nib.TraitedSpec): @@ -283,3 +283,17 @@ def test_modify_paths_bug(tmpdir): assert outputs.out_dict_path == {out_str: out_path} assert outputs.out_dict_str == {out_str: out_str} assert outputs.out_list == [out_str] * 2 + + +def test_save_load_resultfile(tmpdir): + tmpdir.chdir() + + spc = pe.Node(StrPathConfuser(in_str='2'), name='spc') + spc.base_dir = tmpdir.mkdir('node').strpath + result = spc.run() + + loaded_result = load_resultfile(tmpdir.join('node').join('spc').join('result_spc.pklz')) + + assert result.runtime.dictcopy() == loaded_result.runtime.dictcopy() + assert result.inputs == loaded_result.inputs + assert result.outputs.get() == loaded_result.outputs.get() diff --git a/nipype/pipeline/engine/utils.py b/nipype/pipeline/engine/utils.py index 8c1595c64d..9aa53fb88d 100644 --- a/nipype/pipeline/engine/utils.py +++ b/nipype/pipeline/engine/utils.py @@ -274,40 +274,43 @@ def load_resultfile(results_file, resolve=True): """ Load InterfaceResult file from path. - Parameter - --------- - path : base_dir of node - name : name of node + Parameters + ---------- + results_file : pathlike + Path to an existing pickle (``result_.pklz``) created with + ``save_resultfile``. + Raises ``FileNotFoundError`` if ``results_file`` does not exist. + resolve : bool + Determines whether relative paths will be resolved to absolute (default is ``True``). Returns ------- - result : InterfaceResult structure + result : InterfaceResult + A Nipype object containing the runtime, inputs, outputs and other interface information + such as a traceback in the case of errors. """ results_file = Path(results_file) - if not results_file.exists(): raise FileNotFoundError(results_file) - with indirectory(str(results_file.parent)): - result = loadpkl(results_file) - - if resolve and result.outputs: - try: - outputs = result.outputs.get() - except TypeError: # This is a Bunch - logger.debug('Outputs object of loaded result %s is a Bunch.', results_file) - return result - - logger.debug('Resolving paths in outputs loaded from results file.') - for trait_name, old in list(outputs.items()): - if isdefined(old): - if result.outputs.trait(trait_name).is_trait_type(OutputMultiPath): - old = result.outputs.trait(trait_name).handler.get_value( - result.outputs, trait_name) - value = resolve_path_traits(result.outputs.trait(trait_name), old, - results_file.parent) - setattr(result.outputs, trait_name, value) + result = loadpkl(results_file) + if resolve and result.outputs: + try: + outputs = result.outputs.get() + except TypeError: # This is a Bunch + logger.debug('Outputs object of loaded result %s is a Bunch.', results_file) + return result + + logger.debug('Resolving paths in outputs loaded from results file.') + for trait_name, old in list(outputs.items()): + if isdefined(old): + if result.outputs.trait(trait_name).is_trait_type(OutputMultiPath): + old = result.outputs.trait(trait_name).handler.get_value( + result.outputs, trait_name) + value = resolve_path_traits(result.outputs.trait(trait_name), old, + results_file.parent) + setattr(result.outputs, trait_name, value) return result From 49ada18b729690a19f852cb9d65fd7a086c1a229 Mon Sep 17 00:00:00 2001 From: oesteban Date: Tue, 10 Sep 2019 09:38:42 -0700 Subject: [PATCH 4/6] fix: amend tests to work on py35 and add mobility test --- nipype/pipeline/engine/tests/test_utils.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/nipype/pipeline/engine/tests/test_utils.py b/nipype/pipeline/engine/tests/test_utils.py index eb4415f826..3d89c88f17 100644 --- a/nipype/pipeline/engine/tests/test_utils.py +++ b/nipype/pipeline/engine/tests/test_utils.py @@ -16,7 +16,8 @@ from ....interfaces import base as nib from ....interfaces import utility as niu from .... import config -from ..utils import clean_working_directory, write_workflow_prov, load_resultfile +from ..utils import (clean_working_directory, write_workflow_prov, + save_resultfile, load_resultfile) class InputSpec(nib.TraitedSpec): @@ -286,14 +287,31 @@ def test_modify_paths_bug(tmpdir): def test_save_load_resultfile(tmpdir): + """Test minimally the save/load functions for result files.""" + from shutil import copytree tmpdir.chdir() spc = pe.Node(StrPathConfuser(in_str='2'), name='spc') spc.base_dir = tmpdir.mkdir('node').strpath result = spc.run() - loaded_result = load_resultfile(tmpdir.join('node').join('spc').join('result_spc.pklz')) + loaded_result = load_resultfile( + tmpdir.join('node').join('spc').join('result_spc.pklz').strpath) assert result.runtime.dictcopy() == loaded_result.runtime.dictcopy() assert result.inputs == loaded_result.inputs assert result.outputs.get() == loaded_result.outputs.get() + + # Test the mobility of the result file. + copytree(tmpdir.join('node').strpath, tmpdir.join('node2').strpath) + save_resultfile(result, tmpdir.join('node2').strpath, 'spc', rebase=True) + loaded_result2 = load_resultfile( + tmpdir.join('node2').join('spc').join('result_spc.pklz').strpath) + + assert result.runtime.dictcopy() == loaded_result2.runtime.dictcopy() + assert result.inputs == loaded_result2.inputs + assert loaded_result2.outputs.get() != result.outputs.get() + newpath = result.outputs.out_path.replace('/node/', '/node2/') + assert loaded_result2.outputs.out_path == newpath + assert loaded_result2.outputs.out_tuple[0] == newpath + assert loaded_result2.outputs.out_dict_path['2'] == newpath From 0b510eded97e3837a35033cfdde2a7109bfc2e3e Mon Sep 17 00:00:00 2001 From: oesteban Date: Tue, 10 Sep 2019 17:04:28 -0700 Subject: [PATCH 5/6] fix: address @satra's comment --- nipype/pipeline/plugins/tools.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/nipype/pipeline/plugins/tools.py b/nipype/pipeline/plugins/tools.py index 3ddc37387c..ebf023b787 100644 --- a/nipype/pipeline/plugins/tools.py +++ b/nipype/pipeline/plugins/tools.py @@ -41,10 +41,11 @@ def report_crash(node, traceback=None, hostname=None): During the creation of this crashfile triggered by the above exception, another exception occurred:\n\n{}.""".format(exc).splitlines(keepends=True) else: - if isinstance(result.runtime, list): - host = result.runtime[0].hostname - else: - host = result.runtime.hostname + if getattr(result, 'runtime', None): + if isinstance(result.runtime, list): + host = result.runtime[0].hostname + else: + host = result.runtime.hostname # Try everything to fill in the host host = host or hostname or gethostname() @@ -61,7 +62,7 @@ def report_crash(node, traceback=None, hostname=None): makedirs(crashdir, exist_ok=True) crashfile = os.path.join(crashdir, crashfile) - if node.config['execution']['crashfile_format'].lower() in ['text', 'txt']: + if node.config['execution']['crashfile_format'].lower() in ('text', 'txt', '.txt'): crashfile += '.txt' else: crashfile += '.pklz' From a200bc5b151a1e8473d251beb5eddcb1d61d76e7 Mon Sep 17 00:00:00 2001 From: oesteban Date: Tue, 10 Sep 2019 17:04:48 -0700 Subject: [PATCH 6/6] fix: rebase arg not honored + improved tests (add xfail mark for python 2) --- nipype/pipeline/engine/tests/test_utils.py | 43 +++++++++++++++------- nipype/pipeline/engine/utils.py | 4 ++ nipype/utils/filemanip.py | 2 +- 3 files changed, 34 insertions(+), 15 deletions(-) diff --git a/nipype/pipeline/engine/tests/test_utils.py b/nipype/pipeline/engine/tests/test_utils.py index 3d89c88f17..dd44f430e1 100644 --- a/nipype/pipeline/engine/tests/test_utils.py +++ b/nipype/pipeline/engine/tests/test_utils.py @@ -17,7 +17,7 @@ from ....interfaces import utility as niu from .... import config from ..utils import (clean_working_directory, write_workflow_prov, - save_resultfile, load_resultfile) + load_resultfile) class InputSpec(nib.TraitedSpec): @@ -286,13 +286,20 @@ def test_modify_paths_bug(tmpdir): assert outputs.out_list == [out_str] * 2 -def test_save_load_resultfile(tmpdir): +@pytest.mark.xfail(sys.version_info < (3, 4), + reason="rebase does not fully work with Python 2.7") +@pytest.mark.parametrize("use_relative", [True, False]) +def test_save_load_resultfile(tmpdir, use_relative): """Test minimally the save/load functions for result files.""" - from shutil import copytree + from shutil import copytree, rmtree tmpdir.chdir() + old_use_relative = config.getboolean('execution', 'use_relative_paths') + config.set('execution', 'use_relative_paths', use_relative) + spc = pe.Node(StrPathConfuser(in_str='2'), name='spc') spc.base_dir = tmpdir.mkdir('node').strpath + result = spc.run() loaded_result = load_resultfile( @@ -304,14 +311,22 @@ def test_save_load_resultfile(tmpdir): # Test the mobility of the result file. copytree(tmpdir.join('node').strpath, tmpdir.join('node2').strpath) - save_resultfile(result, tmpdir.join('node2').strpath, 'spc', rebase=True) - loaded_result2 = load_resultfile( - tmpdir.join('node2').join('spc').join('result_spc.pklz').strpath) - - assert result.runtime.dictcopy() == loaded_result2.runtime.dictcopy() - assert result.inputs == loaded_result2.inputs - assert loaded_result2.outputs.get() != result.outputs.get() - newpath = result.outputs.out_path.replace('/node/', '/node2/') - assert loaded_result2.outputs.out_path == newpath - assert loaded_result2.outputs.out_tuple[0] == newpath - assert loaded_result2.outputs.out_dict_path['2'] == newpath + rmtree(tmpdir.join('node').strpath) + + if use_relative: + loaded_result2 = load_resultfile( + tmpdir.join('node2').join('spc').join('result_spc.pklz').strpath) + + assert result.runtime.dictcopy() == loaded_result2.runtime.dictcopy() + assert result.inputs == loaded_result2.inputs + assert loaded_result2.outputs.get() != result.outputs.get() + newpath = result.outputs.out_path.replace('/node/', '/node2/') + assert loaded_result2.outputs.out_path == newpath + assert loaded_result2.outputs.out_tuple[0] == newpath + assert loaded_result2.outputs.out_dict_path['2'] == newpath + else: + with pytest.raises(nib.TraitError): + load_resultfile( + tmpdir.join('node2').join('spc').join('result_spc.pklz').strpath) + + config.set('execution', 'use_relative_paths', old_use_relative) diff --git a/nipype/pipeline/engine/utils.py b/nipype/pipeline/engine/utils.py index 9aa53fb88d..d22150b7b8 100644 --- a/nipype/pipeline/engine/utils.py +++ b/nipype/pipeline/engine/utils.py @@ -250,6 +250,10 @@ def save_resultfile(result, cwd, name, rebase=None): savepkl(resultsfile, result) return + if not rebase: + savepkl(resultsfile, result) + return + backup_traits = {} try: with indirectory(cwd): diff --git a/nipype/utils/filemanip.py b/nipype/utils/filemanip.py index 8fcfdc8beb..35b3de219b 100644 --- a/nipype/utils/filemanip.py +++ b/nipype/utils/filemanip.py @@ -708,7 +708,7 @@ def loadpkl(infile): Attempted to open a results file generated by Nipype version %s, \ with an incompatible Nipype version (%s)""", pkl_metadata['version'], version) raise e - fmlogger.error("""\ + fmlogger.warning("""\ No metadata was found in the pkl file. Make sure you are currently using \ the same Nipype version from the generated pkl.""") raise e