diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index 2c441a5c57..57c566f890 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -195,7 +195,7 @@ def interface(self): def result(self): """Get result from result file (do not hold it in memory)""" return _load_resultfile( - op.join(self.output_dir(), 'result_%s.pklz' % self.name))[0] + op.join(self.output_dir(), 'result_%s.pklz' % self.name)) @property def inputs(self): @@ -518,7 +518,7 @@ def _get_inputs(self): logger.debug('input: %s', key) results_file = info[0] logger.debug('results file: %s', results_file) - outputs = _load_resultfile(results_file)[0].outputs + outputs = _load_resultfile(results_file).outputs if outputs is None: raise RuntimeError("""\ Error populating the input "%s" of node "%s": the results file of the source node \ @@ -565,34 +565,42 @@ def _run_interface(self, execute=True, updatehash=False): def _load_results(self): cwd = self.output_dir() - result, aggregate, attribute_error = _load_resultfile( - op.join(cwd, 'result_%s.pklz' % self.name)) + + try: + result = _load_resultfile( + op.join(cwd, 'result_%s.pklz' % self.name)) + except (traits.TraitError, EOFError): + logger.debug( + 'Error populating inputs/outputs, (re)aggregating results...') + except (AttributeError, ImportError) as err: + logger.debug('attribute error: %s probably using ' + 'different trait pickled file', str(err)) + old_inputs = loadpkl(op.join(cwd, '_inputs.pklz')) + self.inputs.trait_set(**old_inputs) + else: + return result + # try aggregating first - if aggregate: - logger.debug('aggregating results') - if attribute_error: - old_inputs = loadpkl(op.join(cwd, '_inputs.pklz')) - self.inputs.trait_set(**old_inputs) - if not isinstance(self, MapNode): - self._copyfiles_to_wd(linksonly=True) - aggouts = self._interface.aggregate_outputs( - needed_outputs=self.needed_outputs) - runtime = Bunch( - cwd=cwd, - returncode=0, - environ=dict(os.environ), - hostname=socket.gethostname()) - result = InterfaceResult( - interface=self._interface.__class__, - runtime=runtime, - inputs=self._interface.inputs.get_traitsfree(), - outputs=aggouts) - _save_resultfile( - result, cwd, self.name, - rebase=str2bool(self.config['execution']['use_relative_paths'])) - else: - logger.debug('aggregating mapnode results') - result = self._run_interface() + if not isinstance(self, MapNode): + self._copyfiles_to_wd(linksonly=True) + aggouts = self._interface.aggregate_outputs( + needed_outputs=self.needed_outputs) + runtime = Bunch( + cwd=cwd, + returncode=0, + environ=dict(os.environ), + hostname=socket.gethostname()) + result = InterfaceResult( + interface=self._interface.__class__, + runtime=runtime, + inputs=self._interface.inputs.get_traitsfree(), + outputs=aggouts) + _save_resultfile( + result, cwd, self.name, + rebase=str2bool(self.config['execution']['use_relative_paths'])) + else: + logger.debug('aggregating mapnode results') + result = self._run_interface() return result def _run_command(self, execute, copyfiles=True): diff --git a/nipype/pipeline/engine/tests/test_utils.py b/nipype/pipeline/engine/tests/test_utils.py index c462ea1533..dd44f430e1 100644 --- a/nipype/pipeline/engine/tests/test_utils.py +++ b/nipype/pipeline/engine/tests/test_utils.py @@ -16,7 +16,8 @@ from ....interfaces import base as nib from ....interfaces import utility as niu from .... import config -from ..utils import clean_working_directory, write_workflow_prov +from ..utils import (clean_working_directory, write_workflow_prov, + load_resultfile) class InputSpec(nib.TraitedSpec): @@ -283,3 +284,49 @@ def test_modify_paths_bug(tmpdir): assert outputs.out_dict_path == {out_str: out_path} assert outputs.out_dict_str == {out_str: out_str} assert outputs.out_list == [out_str] * 2 + + +@pytest.mark.xfail(sys.version_info < (3, 4), + reason="rebase does not fully work with Python 2.7") +@pytest.mark.parametrize("use_relative", [True, False]) +def test_save_load_resultfile(tmpdir, use_relative): + """Test minimally the save/load functions for result files.""" + from shutil import copytree, rmtree + tmpdir.chdir() + + old_use_relative = config.getboolean('execution', 'use_relative_paths') + config.set('execution', 'use_relative_paths', use_relative) + + spc = pe.Node(StrPathConfuser(in_str='2'), name='spc') + spc.base_dir = tmpdir.mkdir('node').strpath + + result = spc.run() + + loaded_result = load_resultfile( + tmpdir.join('node').join('spc').join('result_spc.pklz').strpath) + + assert result.runtime.dictcopy() == loaded_result.runtime.dictcopy() + assert result.inputs == loaded_result.inputs + assert result.outputs.get() == loaded_result.outputs.get() + + # Test the mobility of the result file. + copytree(tmpdir.join('node').strpath, tmpdir.join('node2').strpath) + rmtree(tmpdir.join('node').strpath) + + if use_relative: + loaded_result2 = load_resultfile( + tmpdir.join('node2').join('spc').join('result_spc.pklz').strpath) + + assert result.runtime.dictcopy() == loaded_result2.runtime.dictcopy() + assert result.inputs == loaded_result2.inputs + assert loaded_result2.outputs.get() != result.outputs.get() + newpath = result.outputs.out_path.replace('/node/', '/node2/') + assert loaded_result2.outputs.out_path == newpath + assert loaded_result2.outputs.out_tuple[0] == newpath + assert loaded_result2.outputs.out_dict_path['2'] == newpath + else: + with pytest.raises(nib.TraitError): + load_resultfile( + tmpdir.join('node2').join('spc').join('result_spc.pklz').strpath) + + config.set('execution', 'use_relative_paths', old_use_relative) diff --git a/nipype/pipeline/engine/utils.py b/nipype/pipeline/engine/utils.py index 65170f14c9..d22150b7b8 100644 --- a/nipype/pipeline/engine/utils.py +++ b/nipype/pipeline/engine/utils.py @@ -38,11 +38,12 @@ write_rst_header, write_rst_dict, write_rst_list, + FileNotFoundError, ) from ...utils.misc import str2bool from ...utils.functions import create_function_from_source from ...interfaces.base.traits_extension import ( - rebase_path_traits, resolve_path_traits, OutputMultiPath, isdefined, Undefined, traits) + rebase_path_traits, resolve_path_traits, OutputMultiPath, isdefined, Undefined) from ...interfaces.base.support import Bunch, InterfaceResult from ...interfaces.base import CommandLine from ...interfaces.utility import IdentityInterface @@ -249,6 +250,10 @@ def save_resultfile(result, cwd, name, rebase=None): savepkl(resultsfile, result) return + if not rebase: + savepkl(resultsfile, result) + return + backup_traits = {} try: with indirectory(cwd): @@ -273,58 +278,44 @@ def load_resultfile(results_file, resolve=True): """ Load InterfaceResult file from path. - Parameter - --------- - path : base_dir of node - name : name of node + Parameters + ---------- + results_file : pathlike + Path to an existing pickle (``result_.pklz``) created with + ``save_resultfile``. + Raises ``FileNotFoundError`` if ``results_file`` does not exist. + resolve : bool + Determines whether relative paths will be resolved to absolute (default is ``True``). Returns ------- - result : InterfaceResult structure - aggregate : boolean indicating whether node should aggregate_outputs - attribute error : boolean indicating whether there was some mismatch in - versions of traits used to store result and hence node needs to - rerun + result : InterfaceResult + A Nipype object containing the runtime, inputs, outputs and other interface information + such as a traceback in the case of errors. """ results_file = Path(results_file) - aggregate = True - result = None - attribute_error = False - if not results_file.exists(): - return result, aggregate, attribute_error + raise FileNotFoundError(results_file) - with indirectory(str(results_file.parent)): + result = loadpkl(results_file) + if resolve and result.outputs: try: - result = loadpkl(results_file) - except (traits.TraitError, EOFError): - logger.debug( - 'some file does not exist. hence trait cannot be set') - except (AttributeError, ImportError) as err: - attribute_error = True - logger.debug('attribute error: %s probably using ' - 'different trait pickled file', str(err)) - else: - aggregate = False - - if resolve and result.outputs: - try: - outputs = result.outputs.get() - except TypeError: # This is a Bunch - return result, aggregate, attribute_error - - logger.debug('Resolving paths in outputs loaded from results file.') - for trait_name, old in list(outputs.items()): - if isdefined(old): - if result.outputs.trait(trait_name).is_trait_type(OutputMultiPath): - old = result.outputs.trait(trait_name).handler.get_value( - result.outputs, trait_name) - value = resolve_path_traits(result.outputs.trait(trait_name), old, - results_file.parent) - setattr(result.outputs, trait_name, value) - - return result, aggregate, attribute_error + outputs = result.outputs.get() + except TypeError: # This is a Bunch + logger.debug('Outputs object of loaded result %s is a Bunch.', results_file) + return result + + logger.debug('Resolving paths in outputs loaded from results file.') + for trait_name, old in list(outputs.items()): + if isdefined(old): + if result.outputs.trait(trait_name).is_trait_type(OutputMultiPath): + old = result.outputs.trait(trait_name).handler.get_value( + result.outputs, trait_name) + value = resolve_path_traits(result.outputs.trait(trait_name), old, + results_file.parent) + setattr(result.outputs, trait_name, value) + return result def strip_temp(files, wd): diff --git a/nipype/pipeline/plugins/tools.py b/nipype/pipeline/plugins/tools.py index 54fffd2398..ebf023b787 100644 --- a/nipype/pipeline/plugins/tools.py +++ b/nipype/pipeline/plugins/tools.py @@ -16,7 +16,7 @@ from traceback import format_exception from ... import logging -from ...utils.filemanip import savepkl, crash2txt, makedirs +from ...utils.filemanip import savepkl, crash2txt, makedirs, FileNotFoundError logger = logging.getLogger('nipype.workflow') @@ -26,17 +26,30 @@ def report_crash(node, traceback=None, hostname=None): """ name = node._id host = None - if node.result and getattr(node.result, 'runtime'): - if isinstance(node.result.runtime, list): - host = node.result.runtime[0].hostname - else: - host = node.result.runtime.hostname + traceback = traceback or format_exception(*sys.exc_info()) + + try: + result = node.result + except FileNotFoundError: + traceback += """ + +When creating this crashfile, the results file corresponding +to the node could not be found.""".splitlines(keepends=True) + except Exception as exc: + traceback += """ + +During the creation of this crashfile triggered by the above exception, +another exception occurred:\n\n{}.""".format(exc).splitlines(keepends=True) + else: + if getattr(result, 'runtime', None): + if isinstance(result.runtime, list): + host = result.runtime[0].hostname + else: + host = result.runtime.hostname # Try everything to fill in the host host = host or hostname or gethostname() logger.error('Node %s failed to run on host %s.', name, host) - if not traceback: - traceback = format_exception(*sys.exc_info()) timeofcrash = strftime('%Y%m%d-%H%M%S') try: login_name = getpass.getuser() @@ -49,7 +62,7 @@ def report_crash(node, traceback=None, hostname=None): makedirs(crashdir, exist_ok=True) crashfile = os.path.join(crashdir, crashfile) - if node.config['execution']['crashfile_format'].lower() in ['text', 'txt']: + if node.config['execution']['crashfile_format'].lower() in ('text', 'txt', '.txt'): crashfile += '.txt' else: crashfile += '.pklz' diff --git a/nipype/utils/filemanip.py b/nipype/utils/filemanip.py index 8fcfdc8beb..35b3de219b 100644 --- a/nipype/utils/filemanip.py +++ b/nipype/utils/filemanip.py @@ -708,7 +708,7 @@ def loadpkl(infile): Attempted to open a results file generated by Nipype version %s, \ with an incompatible Nipype version (%s)""", pkl_metadata['version'], version) raise e - fmlogger.error("""\ + fmlogger.warning("""\ No metadata was found in the pkl file. Make sure you are currently using \ the same Nipype version from the generated pkl.""") raise e