From d338ddd0a7475b6e4bd4d65f4e16fc4e6d6f8fc6 Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Fri, 11 Oct 2019 14:28:34 -0700 Subject: [PATCH 1/2] ENH: Minimize the number of calls to ``_load_results`` when populating inputs This PR attempts to alleviate #3014 by opening the result file of a source node only once when that node feeds into several inputs of the node collecting inputs. Before these changes, a call to ``_load_results`` was issued for every input field that needed to collect its inputs from a past node. Now, all the inputs comming from the same node are put together and the ``_load_results`` function is called just once. The PR also modifies the manner the ``AttributeError``s (#3014) were handled to make it easier to spot whether an error occured while loading results araises when gathering the inputs of a node-to-be-run or elsewhere. --- nipype/pipeline/engine/nodes.py | 72 ++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 33 deletions(-) diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index bf58934f5b..3fab9cfc2f 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -9,7 +9,7 @@ absolute_import) from builtins import range, str, bytes, open -from collections import OrderedDict +from collections import OrderedDict, defaultdict import os import os.path as op @@ -510,7 +510,8 @@ def _get_hashval(self): return self._hashed_inputs, self._hashvalue def _get_inputs(self): - """Retrieve inputs from pointers to results file + """ + Retrieve inputs from pointers to results files. This mechanism can be easily extended/replaced to retrieve data from other data sources (e.g., XNAT, HTTP, etc.,.) @@ -518,41 +519,46 @@ def _get_inputs(self): if self._got_inputs: return - logger.debug('Setting node inputs') + prev_results = defaultdict(list) for key, info in list(self.input_source.items()): - logger.debug('input: %s', key) - results_file = info[0] - logger.debug('results file: %s', results_file) - outputs = _load_resultfile(results_file).outputs + prev_results[info[0]].append((key, info[1])) + + logger.debug('[Node] Setting %d connected inputs from %d previous nodes.', + len(self.input_source), len(prev_results)) + + for results_fname, connections in list(prev_results.items()): + outputs = None + try: + outputs = _load_resultfile(results_fname).outputs + except AttributeError as e: + logger.critical('%s', e) + if outputs is None: raise RuntimeError("""\ -Error populating the input "%s" of node "%s": the results file of the source node \ -(%s) does not contain any outputs.""" % (key, self.name, results_file)) - output_value = Undefined - if isinstance(info[1], tuple): - output_name = info[1][0] - value = getattr(outputs, output_name) - if isdefined(value): - output_value = evaluate_connect_function( - info[1][1], info[1][2], value) - else: - output_name = info[1] +Error populating the inpus of node "%s": the results file of the source node \ +(%s) does not contain any outputs.""" % (self.name, results_fname)) + + for key, conn in connections: + output_value = Undefined + if isinstance(conn, tuple): + value = getattr(outputs, conn[0]) + if isdefined(value): + output_value = evaluate_connect_function( + conn[1], conn[2], value) + else: + output_value = getattr(outputs, conn) + try: - output_value = outputs.trait_get()[output_name] - except AttributeError: - output_value = outputs.dictcopy()[output_name] - logger.debug('output: %s', output_name) - try: - self.set_input(key, deepcopy(output_value)) - except traits.TraitError as e: - msg = ( - e.args[0], '', 'Error setting node input:', - 'Node: %s' % self.name, 'input: %s' % key, - 'results_file: %s' % results_file, - 'value: %s' % str(output_value), - ) - e.args = ('\n'.join(msg), ) - raise + self.set_input(key, deepcopy(output_value)) + except traits.TraitError as e: + msg = ( + e.args[0], '', 'Error setting node input:', + 'Node: %s' % self.name, 'input: %s' % key, + 'results_file: %s' % results_fname, + 'value: %s' % str(output_value), + ) + e.args = ('\n'.join(msg), ) + raise # Successfully set inputs self._got_inputs = True From 2e5436d31a7eecf52fc94fee8d6e01539f773e46 Mon Sep 17 00:00:00 2001 From: oesteban Date: Fri, 11 Oct 2019 16:03:30 -0700 Subject: [PATCH 2/2] fix: one typo in error message, exit early if no connections are to be evaluated --- nipype/pipeline/engine/nodes.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index 3fab9cfc2f..eeb47f6d7a 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -516,15 +516,20 @@ def _get_inputs(self): This mechanism can be easily extended/replaced to retrieve data from other data sources (e.g., XNAT, HTTP, etc.,.) """ - if self._got_inputs: + if self._got_inputs: # Inputs cached + return + + if not self.input_source: # No previous nodes + self._got_inputs = True return prev_results = defaultdict(list) for key, info in list(self.input_source.items()): prev_results[info[0]].append((key, info[1])) - logger.debug('[Node] Setting %d connected inputs from %d previous nodes.', - len(self.input_source), len(prev_results)) + logger.debug( + '[Node] Setting %d connected inputs of node "%s" from %d previous nodes.', + len(self.input_source), self.name, len(prev_results)) for results_fname, connections in list(prev_results.items()): outputs = None @@ -535,7 +540,7 @@ def _get_inputs(self): if outputs is None: raise RuntimeError("""\ -Error populating the inpus of node "%s": the results file of the source node \ +Error populating the inputs of node "%s": the results file of the source node \ (%s) does not contain any outputs.""" % (self.name, results_fname)) for key, conn in connections: