diff --git a/CHANGES b/CHANGES index 73f4566e95..c1991d5f2b 100644 --- a/CHANGES +++ b/CHANGES @@ -1,9 +1,10 @@ Upcoming release (0.14.1) ========================= +* MAINT: Cleanup EngineBase (https://github.com/nipy/nipype/pull/2376) * FIX: Robustly handled outputs of 3dFWHMx across different versions of AFNI (https://github.com/nipy/nipype/pull/2373) * FIX: Cluster threshold in randomise + change default prefix (https://github.com/nipy/nipype/pull/2369) -* MAINT: Cleaning / simplify ``Node`` (https://github.com/nipy/nipype/pull/#2325) +* MAINT: Cleaning / simplify ``Node`` (https://github.com/nipy/nipype/pull/2325) * STY: Cleanup of PEP8 violations (https://github.com/nipy/nipype/pull/2358) * STY: Cleanup of trailing spaces and adding of missing newlines at end of files (https://github.com/nipy/nipype/pull/2355) diff --git a/nipype/interfaces/tests/test_io.py b/nipype/interfaces/tests/test_io.py index a2103eadf2..5bd082019a 100644 --- a/nipype/interfaces/tests/test_io.py +++ b/nipype/interfaces/tests/test_io.py @@ -147,7 +147,8 @@ def test_s3datagrabber(): "node_output": ["model"] }), ]) -def test_selectfiles(SF_args, inputs_att, expected): +def test_selectfiles(tmpdir, SF_args, inputs_att, expected): + tmpdir.chdir() base_dir = op.dirname(nipype.__file__) dg = nio.SelectFiles(base_directory=base_dir, **SF_args) for key, val in inputs_att.items(): diff --git a/nipype/pipeline/engine/base.py b/nipype/pipeline/engine/base.py index 07615d4164..51449632ba 100644 --- a/nipype/pipeline/engine/base.py +++ b/nipype/pipeline/engine/base.py @@ -18,18 +18,14 @@ absolute_import) from builtins import object -from future import standard_library -standard_library.install_aliases() - from copy import deepcopy import re import numpy as np -from ... import logging + +from ... import config from ...interfaces.base import DynamicTraitedSpec from ...utils.filemanip import loadpkl, savepkl -logger = logging.getLogger('workflow') - class EngineBase(object): """Defines common attributes and functions for workflows and nodes.""" @@ -47,35 +43,36 @@ def __init__(self, name=None, base_dir=None): default=None, which results in the use of mkdtemp """ + self._hierarchy = None + self._name = None + self.base_dir = base_dir - self.config = None - self._verify_name(name) + self.config = deepcopy(config._sections) self.name = name - # for compatibility with node expansion using iterables - self._id = self.name - self._hierarchy = None @property - def inputs(self): - raise NotImplementedError + def name(self): + return self._name - @property - def outputs(self): - raise NotImplementedError + @name.setter + def name(self, name): + if not name or not re.match(r'^[\w-]+$', name): + raise ValueError('[Workflow|Node] name "%s" is not valid.' % name) + self._name = name @property def fullname(self): - fullname = self.name if self._hierarchy: - fullname = self._hierarchy + '.' + self.name - return fullname + return '%s.%s' % (self._hierarchy, self.name) + return self.name @property - def itername(self): - itername = self._id - if self._hierarchy: - itername = self._hierarchy + '.' + self._id - return itername + def inputs(self): + raise NotImplementedError + + @property + def outputs(self): + raise NotImplementedError def clone(self, name): """Clone an EngineBase object @@ -86,13 +83,10 @@ def clone(self, name): name : string (mandatory) A clone of node or workflow must have a new name """ - if (name is None) or (name == self.name): - raise Exception('Cloning requires a new name') - self._verify_name(name) + if name == self.name: + raise ValueError('Cloning requires a new name, "%s" is in use.' % name) clone = deepcopy(self) clone.name = name - clone._id = name - clone._hierarchy = None return clone def _check_outputs(self, parameter): @@ -103,17 +97,8 @@ def _check_inputs(self, parameter): return True return hasattr(self.inputs, parameter) - def _verify_name(self, name): - valid_name = bool(re.match('^[\w-]+$', name)) - if not valid_name: - raise ValueError('[Workflow|Node] name \'%s\' contains' - ' special characters' % name) - - def __repr__(self): - if self._hierarchy: - return '.'.join((self._hierarchy, self._id)) - else: - return '{}'.format(self._id) + def __str__(self): + return self.fullname def save(self, filename=None): if filename is None: diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index c9bbff1f12..7f99810f68 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -161,12 +161,13 @@ def __init__(self, super(Node, self).__init__(name, kwargs.get('base_dir')) - self.name = name self._interface = interface self._hierarchy = None self._got_inputs = False self._originputs = None self._output_dir = None + self._id = self.name # for compatibility with node expansion using iterables + self.iterables = iterables self.synchronize = synchronize self.itersource = itersource @@ -228,7 +229,6 @@ def n_procs(self): if hasattr(self._interface.inputs, 'num_threads') and isdefined( self._interface.inputs.num_threads): return self._interface.inputs.num_threads - return 1 @n_procs.setter @@ -240,6 +240,13 @@ def n_procs(self, value): if hasattr(self._interface.inputs, 'num_threads'): self._interface.inputs.num_threads = self._n_procs + @property + def itername(self): + itername = self._id + if self._hierarchy: + itername = self._hierarchy + '.' + self._id + return itername + def output_dir(self): """Return the location of the output directory for the node""" # Output dir is cached diff --git a/nipype/pipeline/engine/tests/test_base.py b/nipype/pipeline/engine/tests/test_base.py new file mode 100644 index 0000000000..54356fd6c5 --- /dev/null +++ b/nipype/pipeline/engine/tests/test_base.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +from __future__ import print_function, unicode_literals + +import pytest +from ..base import EngineBase +from ....interfaces import base as nib + + +class InputSpec(nib.TraitedSpec): + input1 = nib.traits.Int(desc='a random int') + input2 = nib.traits.Int(desc='a random int') + input_file = nib.traits.File(desc='Random File') + + +class OutputSpec(nib.TraitedSpec): + output1 = nib.traits.List(nib.traits.Int, desc='outputs') + + +class EngineTestInterface(nib.BaseInterface): + input_spec = InputSpec + output_spec = OutputSpec + + def _run_interface(self, runtime): + runtime.returncode = 0 + return runtime + + def _list_outputs(self): + outputs = self._outputs().get() + outputs['output1'] = [1, self.inputs.input1] + return outputs + + +@pytest.mark.parametrize( + 'name', ['valid1', 'valid_node', 'valid-node', 'ValidNode0']) +def test_create(name): + base = EngineBase(name=name) + assert base.name == name + + +@pytest.mark.parametrize( + 'name', ['invalid*1', 'invalid.1', 'invalid@', 'in/valid', None]) +def test_create_invalid(name): + with pytest.raises(ValueError): + EngineBase(name=name) + + +def test_hierarchy(): + base = EngineBase(name='nodename') + base._hierarchy = 'some.history.behind' + + assert base.name == 'nodename' + assert base.fullname == 'some.history.behind.nodename' + + +def test_clone(): + base = EngineBase(name='nodename') + base2 = base.clone('newnodename') + + assert (base.base_dir == base2.base_dir and + base.config == base2.config and + base2.name == 'newnodename') + + with pytest.raises(ValueError): + base.clone('nodename') diff --git a/nipype/pipeline/engine/tests/test_engine.py b/nipype/pipeline/engine/tests/test_engine.py index 49b9f46852..151849241c 100644 --- a/nipype/pipeline/engine/tests/test_engine.py +++ b/nipype/pipeline/engine/tests/test_engine.py @@ -6,80 +6,21 @@ from __future__ import print_function from __future__ import unicode_literals -from builtins import str from builtins import open from copy import deepcopy from glob import glob import os -import sys -import networkx as nx import pytest from ... import engine as pe -from ....interfaces import base as nib - - -class InputSpec(nib.TraitedSpec): - input1 = nib.traits.Int(desc='a random int') - input2 = nib.traits.Int(desc='a random int') - input_file = nib.traits.File(desc='Random File') - - -class OutputSpec(nib.TraitedSpec): - output1 = nib.traits.List(nib.traits.Int, desc='outputs') - - -class EngineTestInterface(nib.BaseInterface): - input_spec = InputSpec - output_spec = OutputSpec - - def _run_interface(self, runtime): - runtime.returncode = 0 - return runtime - - def _list_outputs(self): - outputs = self._outputs().get() - outputs['output1'] = [1, self.inputs.input1] - return outputs - - -def test_init(): - with pytest.raises(TypeError): - pe.Workflow() - pipe = pe.Workflow(name='pipe') - assert type(pipe._graph) == nx.DiGraph - - -def test_connect(): - pipe = pe.Workflow(name='pipe') - mod1 = pe.Node(interface=EngineTestInterface(), name='mod1') - mod2 = pe.Node(interface=EngineTestInterface(), name='mod2') - pipe.connect([(mod1, mod2, [('output1', 'input1')])]) - - assert mod1 in pipe._graph.nodes() - assert mod2 in pipe._graph.nodes() - assert pipe._graph.get_edge_data(mod1, mod2) == { - 'connect': [('output1', 'input1')] - } - - -def test_add_nodes(): - pipe = pe.Workflow(name='pipe') - mod1 = pe.Node(interface=EngineTestInterface(), name='mod1') - mod2 = pe.Node(interface=EngineTestInterface(), name='mod2') - pipe.add_nodes([mod1, mod2]) - - assert mod1 in pipe._graph.nodes() - assert mod2 in pipe._graph.nodes() +from .test_base import EngineTestInterface # Test graph expansion. The following set tests the building blocks # of the graph expansion routine. # XXX - SG I'll create a graphical version of these tests and actually # ensure that all connections are tested later - - @pytest.mark.parametrize( "iterables, expected", [ @@ -354,279 +295,6 @@ def test_itersource_synchronize2_expansion(): assert len(pe.generate_expanded_graph(wf3._flatgraph).nodes()) == 30 -def test_disconnect(): - from nipype.interfaces.utility import IdentityInterface - a = pe.Node(IdentityInterface(fields=['a', 'b']), name='a') - b = pe.Node(IdentityInterface(fields=['a', 'b']), name='b') - flow1 = pe.Workflow(name='test') - flow1.connect(a, 'a', b, 'a') - flow1.disconnect(a, 'a', b, 'a') - assert list(flow1._graph.edges()) == [] - - -def test_doubleconnect(): - from nipype.interfaces.utility import IdentityInterface - a = pe.Node(IdentityInterface(fields=['a', 'b']), name='a') - b = pe.Node(IdentityInterface(fields=['a', 'b']), name='b') - flow1 = pe.Workflow(name='test') - flow1.connect(a, 'a', b, 'a') - x = lambda: flow1.connect(a, 'b', b, 'a') - with pytest.raises(Exception) as excinfo: - x() - assert "Trying to connect" in str(excinfo.value) - - c = pe.Node(IdentityInterface(fields=['a', 'b']), name='c') - flow1 = pe.Workflow(name='test2') - x = lambda: flow1.connect([(a, c, [('b', 'b')]), (b, c, [('a', 'b')])]) - with pytest.raises(Exception) as excinfo: - x() - assert "Trying to connect" in str(excinfo.value) - - -''' -Test for order of iterables - -import nipype.pipeline.engine as pe -import nipype.interfaces.utility as niu - -wf1 = pe.Workflow(name='wf1') -node1 = pe.Node(interface=niu.IdentityInterface(fields=['a1','b1']), name='node1') -node1.iterables = ('a1', [1,2]) -wf1.add_nodes([node1]) - -wf2 = pe.Workflow(name='wf2') -node2 = pe.Node(interface=niu.IdentityInterface(fields=['a2','b2']), name='node2') -wf2.add_nodes([node2]) -wf1.connect(node1, 'a1', wf2, 'node2.a2') - -node4 = pe.Node(interface=niu.IdentityInterface(fields=['a4','b4']), name='node4') -#node4.iterables = ('a4', [5,6]) -wf2.connect(node2, 'b2', node4, 'b4') - -wf3 = pe.Workflow(name='wf3') -node3 = pe.Node(interface=niu.IdentityInterface(fields=['a3','b3']), name='node3') -node3.iterables = ('b3', [3,4]) -wf3.add_nodes([node3]) -wf1.connect(wf3, 'node3.b3', wf2, 'node2.b2') - -wf1.base_dir = os.path.join(os.getcwd(),'testit') -wf1.run(inseries=True, createdirsonly=True) - -wf1.write_graph(graph2use='exec') -''' -''' -import nipype.pipeline.engine as pe -import nipype.interfaces.spm as spm -import os -from io import StringIO -from nipype.utils.config import config - -config.readfp(StringIO(""" -[execution] -remove_unnecessary_outputs = true -""")) - - -segment = pe.Node(interface=spm.Segment(), name="segment") -segment.inputs.data = os.path.abspath("data/T1.nii") -segment.inputs.gm_output_type = [True, True, True] -segment.inputs.wm_output_type = [True, True, True] - - -smooth_gm = pe.Node(interface=spm.Smooth(), name="smooth_gm") - -workflow = pe.Workflow(name="workflow_cleanup_test") -workflow.base_dir = os.path.abspath('./workflow_cleanup_test') - -workflow.connect([(segment, smooth_gm, [('native_gm_image','in_files')])]) - -workflow.run() - -#adding new node that uses one of the previously deleted outputs of segment; this should force segment to rerun -smooth_wm = pe.Node(interface=spm.Smooth(), name="smooth_wm") - -workflow.connect([(segment, smooth_wm, [('native_wm_image','in_files')])]) - -workflow.run() - -workflow.run() -''' - -# Node - - -def test_node_init(): - with pytest.raises(Exception): - pe.Node() - try: - node = pe.Node(EngineTestInterface, name='test') - except IOError: - exception = True - else: - exception = False - assert exception - - -def test_workflow_add(): - from nipype.interfaces.utility import IdentityInterface as ii - n1 = pe.Node(ii(fields=['a', 'b']), name='n1') - n2 = pe.Node(ii(fields=['c', 'd']), name='n2') - n3 = pe.Node(ii(fields=['c', 'd']), name='n1') - w1 = pe.Workflow(name='test') - w1.connect(n1, 'a', n2, 'c') - for node in [n1, n2, n3]: - with pytest.raises(IOError): - w1.add_nodes([node]) - with pytest.raises(IOError): - w1.connect([(w1, n2, [('n1.a', 'd')])]) - - -def test_node_get_output(): - mod1 = pe.Node(interface=EngineTestInterface(), name='mod1') - mod1.inputs.input1 = 1 - mod1.run() - assert mod1.get_output('output1') == [1, 1] - mod1._result = None - assert mod1.get_output('output1') == [1, 1] - - -def test_mapnode_iterfield_check(): - mod1 = pe.MapNode(EngineTestInterface(), iterfield=['input1'], name='mod1') - with pytest.raises(ValueError): - mod1._check_iterfield() - mod1 = pe.MapNode( - EngineTestInterface(), iterfield=['input1', 'input2'], name='mod1') - mod1.inputs.input1 = [1, 2] - mod1.inputs.input2 = 3 - with pytest.raises(ValueError): - mod1._check_iterfield() - - -@pytest.mark.parametrize("x_inp, f_exp", - [(3, [6]), ([2, 3], [4, 6]), ((2, 3), [4, 6]), - (range(3), [0, 2, 4]), ("Str", ["StrStr"]), - (["Str1", "Str2"], ["Str1Str1", "Str2Str2"])]) -def test_mapnode_iterfield_type(x_inp, f_exp): - from nipype import MapNode, Function - - def double_func(x): - return 2 * x - - double = Function(["x"], ["f_x"], double_func) - - double_node = MapNode(double, name="double", iterfield=["x"]) - double_node.inputs.x = x_inp - - res = double_node.run() - assert res.outputs.f_x == f_exp - - -def test_mapnode_nested(tmpdir): - tmpdir.chdir() - from nipype import MapNode, Function - - def func1(in1): - return in1 + 1 - - n1 = MapNode( - Function(input_names=['in1'], output_names=['out'], function=func1), - iterfield=['in1'], - nested=True, - name='n1') - n1.inputs.in1 = [[1, [2]], 3, [4, 5]] - n1.run() - assert n1.get_output('out') == [[2, [3]], 4, [5, 6]] - - n2 = MapNode( - Function(input_names=['in1'], output_names=['out'], function=func1), - iterfield=['in1'], - nested=False, - name='n1') - n2.inputs.in1 = [[1, [2]], 3, [4, 5]] - - with pytest.raises(Exception) as excinfo: - n2.run() - assert "can only concatenate list" in str(excinfo.value) - - -def test_mapnode_expansion(tmpdir): - tmpdir.chdir() - from nipype import MapNode, Function - - def func1(in1): - return in1 + 1 - - mapnode = MapNode( - Function(function=func1), - iterfield='in1', - name='mapnode', - n_procs=2, - mem_gb=2) - mapnode.inputs.in1 = [1, 2] - - for idx, node in mapnode._make_nodes(): - for attr in ('overwrite', 'run_without_submitting', 'plugin_args'): - assert getattr(node, attr) == getattr(mapnode, attr) - for attr in ('_n_procs', '_mem_gb'): - assert (getattr(node, attr) == getattr(mapnode, attr)) - - -def test_node_hash(tmpdir): - from nipype.interfaces.utility import Function - tmpdir.chdir() - - def func1(): - return 1 - - def func2(a): - return a + 1 - - n1 = pe.Node( - Function(input_names=[], output_names=['a'], function=func1), - name='n1') - n2 = pe.Node( - Function(input_names=['a'], output_names=['b'], function=func2), - name='n2') - w1 = pe.Workflow(name='test') - modify = lambda x: x + 1 - n1.inputs.a = 1 - w1.connect(n1, ('a', modify), n2, 'a') - w1.base_dir = os.getcwd() - # generate outputs - w1.run(plugin='Linear') - # ensure plugin is being called - w1.config['execution'] = { - 'stop_on_first_crash': 'true', - 'local_hash_check': 'false', - 'crashdump_dir': os.getcwd() - } - # create dummy distributed plugin class - from nipype.pipeline.plugins.base import DistributedPluginBase - - # create a custom exception - class EngineTestException(Exception): - pass - - class RaiseError(DistributedPluginBase): - def _submit_job(self, node, updatehash=False): - raise EngineTestException('Submit called') - - # check if a proper exception is raised - with pytest.raises(EngineTestException) as excinfo: - w1.run(plugin=RaiseError()) - assert 'Submit called' == str(excinfo.value) - - # rerun to ensure we have outputs - w1.run(plugin='Linear') - # set local check - w1.config['execution'] = { - 'stop_on_first_crash': 'true', - 'local_hash_check': 'true', - 'crashdump_dir': os.getcwd() - } - - w1.run(plugin=RaiseError()) - def test_old_config(tmpdir): tmpdir.chdir() diff --git a/nipype/pipeline/engine/tests/test_nodes.py b/nipype/pipeline/engine/tests/test_nodes.py new file mode 100644 index 0000000000..51fda95f63 --- /dev/null +++ b/nipype/pipeline/engine/tests/test_nodes.py @@ -0,0 +1,292 @@ +# -*- coding: utf-8 -*- +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +from __future__ import print_function, unicode_literals +from builtins import str +import os +from copy import deepcopy +import pytest + +from .... import config +from ....interfaces import utility as niu +from ... import engine as pe +from ..utils import merge_dict +from .test_base import EngineTestInterface +from .test_utils import UtilsTestInterface + +''' +Test for order of iterables + +import nipype.pipeline.engine as pe +import nipype.interfaces.utility as niu + +wf1 = pe.Workflow(name='wf1') +node1 = pe.Node(interface=niu.IdentityInterface(fields=['a1','b1']), name='node1') +node1.iterables = ('a1', [1,2]) +wf1.add_nodes([node1]) + +wf2 = pe.Workflow(name='wf2') +node2 = pe.Node(interface=niu.IdentityInterface(fields=['a2','b2']), name='node2') +wf2.add_nodes([node2]) +wf1.connect(node1, 'a1', wf2, 'node2.a2') + +node4 = pe.Node(interface=niu.IdentityInterface(fields=['a4','b4']), name='node4') +#node4.iterables = ('a4', [5,6]) +wf2.connect(node2, 'b2', node4, 'b4') + +wf3 = pe.Workflow(name='wf3') +node3 = pe.Node(interface=niu.IdentityInterface(fields=['a3','b3']), name='node3') +node3.iterables = ('b3', [3,4]) +wf3.add_nodes([node3]) +wf1.connect(wf3, 'node3.b3', wf2, 'node2.b2') + +wf1.base_dir = os.path.join(os.getcwd(),'testit') +wf1.run(inseries=True, createdirsonly=True) + +wf1.write_graph(graph2use='exec') +''' +''' +import nipype.pipeline.engine as pe +import nipype.interfaces.spm as spm +import os +from io import StringIO +from nipype.utils.config import config + +config.readfp(StringIO(""" +[execution] +remove_unnecessary_outputs = true +""")) + + +segment = pe.Node(interface=spm.Segment(), name="segment") +segment.inputs.data = os.path.abspath("data/T1.nii") +segment.inputs.gm_output_type = [True, True, True] +segment.inputs.wm_output_type = [True, True, True] + + +smooth_gm = pe.Node(interface=spm.Smooth(), name="smooth_gm") + +workflow = pe.Workflow(name="workflow_cleanup_test") +workflow.base_dir = os.path.abspath('./workflow_cleanup_test') + +workflow.connect([(segment, smooth_gm, [('native_gm_image','in_files')])]) + +workflow.run() + +#adding new node that uses one of the previously deleted outputs of segment; this should force segment to rerun +smooth_wm = pe.Node(interface=spm.Smooth(), name="smooth_wm") + +workflow.connect([(segment, smooth_wm, [('native_wm_image','in_files')])]) + +workflow.run() + +workflow.run() +''' + +# Node + + +def test_node_init(): + with pytest.raises(TypeError): + pe.Node() + with pytest.raises(IOError): + pe.Node(EngineTestInterface, name='test') + + +def test_node_get_output(): + mod1 = pe.Node(interface=EngineTestInterface(), name='mod1') + mod1.inputs.input1 = 1 + mod1.run() + assert mod1.get_output('output1') == [1, 1] + mod1._result = None + assert mod1.get_output('output1') == [1, 1] + + +def test_mapnode_iterfield_check(): + mod1 = pe.MapNode(EngineTestInterface(), iterfield=['input1'], name='mod1') + with pytest.raises(ValueError): + mod1._check_iterfield() + mod1 = pe.MapNode( + EngineTestInterface(), iterfield=['input1', 'input2'], name='mod1') + mod1.inputs.input1 = [1, 2] + mod1.inputs.input2 = 3 + with pytest.raises(ValueError): + mod1._check_iterfield() + + +@pytest.mark.parametrize("x_inp, f_exp", + [(3, [6]), ([2, 3], [4, 6]), ((2, 3), [4, 6]), + (range(3), [0, 2, 4]), ("Str", ["StrStr"]), + (["Str1", "Str2"], ["Str1Str1", "Str2Str2"])]) +def test_mapnode_iterfield_type(x_inp, f_exp): + from nipype import MapNode, Function + + def double_func(x): + return 2 * x + + double = Function(["x"], ["f_x"], double_func) + + double_node = MapNode(double, name="double", iterfield=["x"]) + double_node.inputs.x = x_inp + + res = double_node.run() + assert res.outputs.f_x == f_exp + + +def test_mapnode_nested(tmpdir): + tmpdir.chdir() + from nipype import MapNode, Function + + def func1(in1): + return in1 + 1 + + n1 = MapNode( + Function(input_names=['in1'], output_names=['out'], function=func1), + iterfield=['in1'], + nested=True, + name='n1') + n1.inputs.in1 = [[1, [2]], 3, [4, 5]] + n1.run() + assert n1.get_output('out') == [[2, [3]], 4, [5, 6]] + + n2 = MapNode( + Function(input_names=['in1'], output_names=['out'], function=func1), + iterfield=['in1'], + nested=False, + name='n1') + n2.inputs.in1 = [[1, [2]], 3, [4, 5]] + + with pytest.raises(Exception) as excinfo: + n2.run() + assert "can only concatenate list" in str(excinfo.value) + + +def test_mapnode_expansion(tmpdir): + tmpdir.chdir() + from nipype import MapNode, Function + + def func1(in1): + return in1 + 1 + + mapnode = MapNode( + Function(function=func1), + iterfield='in1', + name='mapnode', + n_procs=2, + mem_gb=2) + mapnode.inputs.in1 = [1, 2] + + for idx, node in mapnode._make_nodes(): + for attr in ('overwrite', 'run_without_submitting', 'plugin_args'): + assert getattr(node, attr) == getattr(mapnode, attr) + for attr in ('_n_procs', '_mem_gb'): + assert (getattr(node, attr) == getattr(mapnode, attr)) + + +def test_node_hash(tmpdir): + from nipype.interfaces.utility import Function + tmpdir.chdir() + + def func1(): + return 1 + + def func2(a): + return a + 1 + + n1 = pe.Node( + Function(input_names=[], output_names=['a'], function=func1), + name='n1') + n2 = pe.Node( + Function(input_names=['a'], output_names=['b'], function=func2), + name='n2') + w1 = pe.Workflow(name='test') + + def modify(x): + return x + 1 + n1.inputs.a = 1 + w1.connect(n1, ('a', modify), n2, 'a') + w1.base_dir = os.getcwd() + + # create dummy distributed plugin class + from nipype.pipeline.plugins.base import DistributedPluginBase + + # create a custom exception + class EngineTestException(Exception): + pass + + class RaiseError(DistributedPluginBase): + def _submit_job(self, node, updatehash=False): + raise EngineTestException('Submit called') + + # check if a proper exception is raised + with pytest.raises(EngineTestException) as excinfo: + w1.run(plugin=RaiseError()) + assert 'Submit called' == str(excinfo.value) + + # generate outputs + w1.run(plugin='Linear') + # ensure plugin is being called + w1.config['execution'] = { + 'stop_on_first_crash': 'true', + 'local_hash_check': 'false', + 'crashdump_dir': os.getcwd() + } + + # rerun to ensure we have outputs + w1.run(plugin='Linear') + # set local check + w1.config['execution'] = { + 'stop_on_first_crash': 'true', + 'local_hash_check': 'true', + 'crashdump_dir': os.getcwd() + } + + w1.run(plugin=RaiseError()) + + +def test_outputs_removal(tmpdir): + def test_function(arg1): + import os + file1 = os.path.join(os.getcwd(), 'file1.txt') + file2 = os.path.join(os.getcwd(), 'file2.txt') + with open(file1, 'wt') as fp: + fp.write('%d' % arg1) + with open(file2, 'wt') as fp: + fp.write('%d' % arg1) + return file1, file2 + + n1 = pe.Node( + niu.Function( + input_names=['arg1'], + output_names=['file1', 'file2'], + function=test_function), + base_dir=tmpdir.strpath, + name='testoutputs') + n1.inputs.arg1 = 1 + n1.config = {'execution': {'remove_unnecessary_outputs': True}} + n1.config = merge_dict(deepcopy(config._sections), n1.config) + n1.run() + assert tmpdir.join(n1.name, 'file1.txt').check() + assert tmpdir.join(n1.name, 'file1.txt').check() + n1.needed_outputs = ['file2'] + n1.run() + assert not tmpdir.join(n1.name, 'file1.txt').check() + assert tmpdir.join(n1.name, 'file2.txt').check() + + +def test_inputs_removal(tmpdir): + file1 = tmpdir.join('file1.txt') + file1.write('dummy_file') + n1 = pe.Node( + UtilsTestInterface(), base_dir=tmpdir.strpath, name='testinputs') + n1.inputs.in_file = file1.strpath + n1.config = {'execution': {'keep_inputs': True}} + n1.config = merge_dict(deepcopy(config._sections), n1.config) + n1.run() + assert tmpdir.join(n1.name, 'file1.txt').check() + n1.inputs.in_file = file1.strpath + n1.config = {'execution': {'keep_inputs': False}} + n1.config = merge_dict(deepcopy(config._sections), n1.config) + n1.overwrite = True + n1.run() + assert not tmpdir.join(n1.name, 'file1.txt').check() diff --git a/nipype/pipeline/engine/tests/test_utils.py b/nipype/pipeline/engine/tests/test_utils.py index 92d5b703bb..42f8b2434e 100644 --- a/nipype/pipeline/engine/tests/test_utils.py +++ b/nipype/pipeline/engine/tests/test_utils.py @@ -10,14 +10,35 @@ import os import sys from copy import deepcopy -from shutil import rmtree import pytest from ... import engine as pe from ....interfaces import base as nib from ....interfaces import utility as niu from .... import config -from ..utils import merge_dict, clean_working_directory, write_workflow_prov +from ..utils import clean_working_directory, write_workflow_prov + + +class InputSpec(nib.TraitedSpec): + in_file = nib.File(exists=True, copyfile=True) + + +class OutputSpec(nib.TraitedSpec): + output1 = nib.traits.List(nib.traits.Int, desc='outputs') + + +class UtilsTestInterface(nib.BaseInterface): + input_spec = InputSpec + output_spec = OutputSpec + + def _run_interface(self, runtime): + runtime.returncode = 0 + return runtime + + def _list_outputs(self): + outputs = self._outputs().get() + outputs['output1'] = [1] + return outputs def test_identitynode_removal(tmpdir): @@ -99,204 +120,11 @@ class InputSpec(nib.TraitedSpec): config.set_default_config() -def test_outputs_removal(tmpdir): - def test_function(arg1): - import os - file1 = os.path.join(os.getcwd(), 'file1.txt') - file2 = os.path.join(os.getcwd(), 'file2.txt') - fp = open(file1, 'wt') - fp.write('%d' % arg1) - fp.close() - fp = open(file2, 'wt') - fp.write('%d' % arg1) - fp.close() - return file1, file2 - - n1 = pe.Node( - niu.Function( - input_names=['arg1'], - output_names=['file1', 'file2'], - function=test_function), - base_dir=tmpdir.strpath, - name='testoutputs') - n1.inputs.arg1 = 1 - n1.config = {'execution': {'remove_unnecessary_outputs': True}} - n1.config = merge_dict(deepcopy(config._sections), n1.config) - n1.run() - assert tmpdir.join(n1.name, 'file1.txt').check() - assert tmpdir.join(n1.name, 'file1.txt').check() - n1.needed_outputs = ['file2'] - n1.run() - assert not tmpdir.join(n1.name, 'file1.txt').check() - assert tmpdir.join(n1.name, 'file2.txt').check() - - -class InputSpec(nib.TraitedSpec): - in_file = nib.File(exists=True, copyfile=True) - - -class OutputSpec(nib.TraitedSpec): - output1 = nib.traits.List(nib.traits.Int, desc='outputs') - - -class UtilsTestInterface(nib.BaseInterface): - input_spec = InputSpec - output_spec = OutputSpec - - def _run_interface(self, runtime): - runtime.returncode = 0 - return runtime - - def _list_outputs(self): - outputs = self._outputs().get() - outputs['output1'] = [1] - return outputs - - -def test_inputs_removal(tmpdir): - file1 = tmpdir.join('file1.txt') - file1.write('dummy_file') - n1 = pe.Node( - UtilsTestInterface(), base_dir=tmpdir.strpath, name='testinputs') - n1.inputs.in_file = file1.strpath - n1.config = {'execution': {'keep_inputs': True}} - n1.config = merge_dict(deepcopy(config._sections), n1.config) - n1.run() - assert tmpdir.join(n1.name, 'file1.txt').check() - n1.inputs.in_file = file1.strpath - n1.config = {'execution': {'keep_inputs': False}} - n1.config = merge_dict(deepcopy(config._sections), n1.config) - n1.overwrite = True - n1.run() - assert not tmpdir.join(n1.name, 'file1.txt').check() - - -def test_outputs_removal_wf(tmpdir): - def test_function(arg1): - import os - file1 = os.path.join(os.getcwd(), 'file1.txt') - file2 = os.path.join(os.getcwd(), 'file2.txt') - file3 = os.path.join(os.getcwd(), 'file3.txt') - file4 = os.path.join(os.getcwd(), 'subdir', 'file1.txt') - files = [file1, file2, file3, file4] - os.mkdir("subdir") - for filename in files: - with open(filename, 'wt') as fp: - fp.write('%d' % arg1) - return file1, file2, os.path.join(os.getcwd(), "subdir") - - def test_function2(in_file, arg): - import os - in_arg = open(in_file).read() - file1 = os.path.join(os.getcwd(), 'file1.txt') - file2 = os.path.join(os.getcwd(), 'file2.txt') - file3 = os.path.join(os.getcwd(), 'file3.txt') - files = [file1, file2, file3] - for filename in files: - with open(filename, 'wt') as fp: - fp.write('%d' % arg + in_arg) - return file1, file2, 1 - - def test_function3(arg): - import os - return arg - - for plugin in ('Linear', ): # , 'MultiProc'): - n1 = pe.Node( - niu.Function( - input_names=['arg1'], - output_names=['out_file1', 'out_file2', 'dir'], - function=test_function), - name='n1', - base_dir=tmpdir.strpath) - n1.inputs.arg1 = 1 - - n2 = pe.Node( - niu.Function( - input_names=['in_file', 'arg'], - output_names=['out_file1', 'out_file2', 'n'], - function=test_function2), - name='n2', - base_dir=tmpdir.strpath) - n2.inputs.arg = 2 - - n3 = pe.Node( - niu.Function( - input_names=['arg'], - output_names=['n'], - function=test_function3), - name='n3', - base_dir=tmpdir.strpath) - - wf = pe.Workflow( - name="node_rem_test" + plugin, base_dir=tmpdir.strpath) - wf.connect(n1, "out_file1", n2, "in_file") - - wf.run(plugin='Linear') - - for remove_unnecessary_outputs in [True, False]: - config.set_default_config() - wf.config = { - 'execution': { - 'remove_unnecessary_outputs': remove_unnecessary_outputs - } - } - rmtree(os.path.join(wf.base_dir, wf.name)) - wf.run(plugin=plugin) - - assert os.path.exists( - os.path.join(wf.base_dir, wf.name, n1.name, - 'file2.txt')) != remove_unnecessary_outputs - assert os.path.exists( - os.path.join(wf.base_dir, wf.name, n1.name, "subdir", - 'file1.txt')) != remove_unnecessary_outputs - assert os.path.exists( - os.path.join(wf.base_dir, wf.name, n1.name, 'file1.txt')) - assert os.path.exists( - os.path.join(wf.base_dir, wf.name, n1.name, - 'file3.txt')) != remove_unnecessary_outputs - assert os.path.exists( - os.path.join(wf.base_dir, wf.name, n2.name, 'file1.txt')) - assert os.path.exists( - os.path.join(wf.base_dir, wf.name, n2.name, 'file2.txt')) - assert os.path.exists( - os.path.join(wf.base_dir, wf.name, n2.name, - 'file3.txt')) != remove_unnecessary_outputs - - n4 = pe.Node(UtilsTestInterface(), name='n4', base_dir=tmpdir.strpath) - wf.connect(n2, "out_file1", n4, "in_file") - - def pick_first(l): - return l[0] - - wf.connect(n4, ("output1", pick_first), n3, "arg") - for remove_unnecessary_outputs in [True, False]: - for keep_inputs in [True, False]: - config.set_default_config() - wf.config = { - 'execution': { - 'keep_inputs': keep_inputs, - 'remove_unnecessary_outputs': - remove_unnecessary_outputs - } - } - rmtree(os.path.join(wf.base_dir, wf.name)) - wf.run(plugin=plugin) - assert os.path.exists( - os.path.join(wf.base_dir, wf.name, n2.name, 'file1.txt')) - assert os.path.exists( - os.path.join(wf.base_dir, wf.name, n2.name, - 'file2.txt')) != remove_unnecessary_outputs - assert os.path.exists( - os.path.join(wf.base_dir, wf.name, n4.name, - 'file1.txt')) == keep_inputs - - -def fwhm(fwhm): - return fwhm - - def create_wf(name): + """Creates a workflow for the following tests""" + def fwhm(fwhm): + return fwhm + pipe = pe.Workflow(name=name) process = pe.Node( niu.Function( diff --git a/nipype/pipeline/engine/tests/test_workflows.py b/nipype/pipeline/engine/tests/test_workflows.py index d2b23ae586..2b32311d09 100644 --- a/nipype/pipeline/engine/tests/test_workflows.py +++ b/nipype/pipeline/engine/tests/test_workflows.py @@ -3,10 +3,85 @@ # vi: set ft=python sts=4 ts=4 sw=4 et: """Tests for the engine workflows module """ +import os +from shutil import rmtree +from itertools import product import pytest +import networkx as nx -from ... import engine as pe +from .... import config from ....interfaces import utility as niu +from ... import engine as pe +from .test_base import EngineTestInterface +from .test_utils import UtilsTestInterface + + +def test_init(): + with pytest.raises(TypeError): + pe.Workflow() + pipe = pe.Workflow(name='pipe') + assert type(pipe._graph) == nx.DiGraph + + +def test_connect(): + pipe = pe.Workflow(name='pipe') + mod2 = pe.Node(EngineTestInterface(), name='mod2') + mod1 = pe.Node(EngineTestInterface(), name='mod1') + pipe.connect([(mod1, mod2, [('output1', 'input1')])]) + + assert mod1 in pipe._graph.nodes() + assert mod2 in pipe._graph.nodes() + assert pipe._graph.get_edge_data(mod1, mod2) == { + 'connect': [('output1', 'input1')] + } + + +def test_add_nodes(): + pipe = pe.Workflow(name='pipe') + mod1 = pe.Node(EngineTestInterface(), name='mod1') + mod2 = pe.Node(EngineTestInterface(), name='mod2') + pipe.add_nodes([mod1, mod2]) + + assert mod1 in pipe._graph.nodes() + assert mod2 in pipe._graph.nodes() + + +def test_disconnect(): + a = pe.Node(niu.IdentityInterface(fields=['a', 'b']), name='a') + b = pe.Node(niu.IdentityInterface(fields=['a', 'b']), name='b') + flow1 = pe.Workflow(name='test') + flow1.connect(a, 'a', b, 'a') + flow1.disconnect(a, 'a', b, 'a') + assert list(flow1._graph.edges()) == [] + + +def test_workflow_add(): + n1 = pe.Node(niu.IdentityInterface(fields=['a', 'b']), name='n1') + n2 = pe.Node(niu.IdentityInterface(fields=['c', 'd']), name='n2') + n3 = pe.Node(niu.IdentityInterface(fields=['c', 'd']), name='n1') + w1 = pe.Workflow(name='test') + w1.connect(n1, 'a', n2, 'c') + for node in [n1, n2, n3]: + with pytest.raises(IOError): + w1.add_nodes([node]) + with pytest.raises(IOError): + w1.connect([(w1, n2, [('n1.a', 'd')])]) + + +def test_doubleconnect(): + a = pe.Node(niu.IdentityInterface(fields=['a', 'b']), name='a') + b = pe.Node(niu.IdentityInterface(fields=['a', 'b']), name='b') + flow1 = pe.Workflow(name='test') + flow1.connect(a, 'a', b, 'a') + with pytest.raises(Exception) as excinfo: + flow1.connect(a, 'b', b, 'a') + assert "Trying to connect" in str(excinfo.value) + + c = pe.Node(niu.IdentityInterface(fields=['a', 'b']), name='c') + flow1 = pe.Workflow(name='test2') + with pytest.raises(Exception) as excinfo: + flow1.connect([(a, c, [('b', 'b')]), (b, c, [('a', 'b')])]) + assert "Trying to connect" in str(excinfo.value) def test_duplicate_node_check(): @@ -34,3 +109,123 @@ def test_duplicate_node_check(): with pytest.raises(IOError) as excinfo: wf.connect(wf_connections) assert 'Duplicate node name "selector3" found.' == str(excinfo.value) + + +def _test_function(arg1): + import os + file1 = os.path.join(os.getcwd(), 'file1.txt') + file2 = os.path.join(os.getcwd(), 'file2.txt') + file3 = os.path.join(os.getcwd(), 'file3.txt') + file4 = os.path.join(os.getcwd(), 'subdir', 'file4.txt') + os.mkdir("subdir") + for filename in [file1, file2, file3, file4]: + with open(filename, 'wt') as fp: + fp.write('%d' % arg1) + return file1, file2, os.path.join(os.getcwd(), "subdir") + + +def _test_function2(in_file, arg): + import os + with open(in_file, 'rt') as fp: + in_arg = fp.read() + + file1 = os.path.join(os.getcwd(), 'file1.txt') + file2 = os.path.join(os.getcwd(), 'file2.txt') + file3 = os.path.join(os.getcwd(), 'file3.txt') + files = [file1, file2, file3] + for filename in files: + with open(filename, 'wt') as fp: + fp.write('%d' % arg + in_arg) + return file1, file2, 1 + + +def _test_function3(arg): + return arg + + +@pytest.mark.parametrize( + 'plugin, remove_unnecessary_outputs, keep_inputs', + list(product(['Linear', 'MultiProc'], [False, True], [True, False]))) +def test_outputs_removal_wf(tmpdir, plugin, remove_unnecessary_outputs, + keep_inputs): + config.set_default_config() + config.set('execution', 'remove_unnecessary_outputs', + remove_unnecessary_outputs) + config.set('execution', 'keep_inputs', keep_inputs) + + n1 = pe.Node( + niu.Function( + output_names=['out_file1', 'out_file2', 'dir'], + function=_test_function), + name='n1', + base_dir=tmpdir.strpath) + n1.inputs.arg1 = 1 + + n2 = pe.Node( + niu.Function( + output_names=['out_file1', 'out_file2', 'n'], + function=_test_function2), + name='n2', + base_dir=tmpdir.strpath) + n2.inputs.arg = 2 + + n3 = pe.Node( + niu.Function( + output_names=['n'], + function=_test_function3), + name='n3', + base_dir=tmpdir.strpath) + + wf = pe.Workflow( + name="node_rem_test" + plugin, base_dir=tmpdir.strpath) + + wf.connect(n1, "out_file1", n2, "in_file") + wf.run(plugin=plugin) + + # Necessary outputs HAVE to exist + assert os.path.exists( + os.path.join(wf.base_dir, wf.name, n1.name, 'file1.txt')) + assert os.path.exists( + os.path.join(wf.base_dir, wf.name, n2.name, 'file1.txt')) + assert os.path.exists( + os.path.join(wf.base_dir, wf.name, n2.name, 'file2.txt')) + + # Unnecessary outputs exist only iff remove_unnecessary_outputs is True + assert os.path.exists( + os.path.join(wf.base_dir, wf.name, n1.name, + 'file2.txt')) is not remove_unnecessary_outputs + assert os.path.exists( + os.path.join(wf.base_dir, wf.name, n1.name, "subdir", + 'file4.txt')) is not remove_unnecessary_outputs + assert os.path.exists( + os.path.join(wf.base_dir, wf.name, n1.name, + 'file3.txt')) is not remove_unnecessary_outputs + assert os.path.exists( + os.path.join(wf.base_dir, wf.name, n2.name, + 'file3.txt')) is not remove_unnecessary_outputs + + n4 = pe.Node(UtilsTestInterface(), name='n4', base_dir=tmpdir.strpath) + wf.connect(n2, "out_file1", n4, "in_file") + + def pick_first(l): + return l[0] + + wf.connect(n4, ("output1", pick_first), n3, "arg") + rmtree(os.path.join(wf.base_dir, wf.name)) + wf.run(plugin=plugin) + + # Test necessary outputs + assert os.path.exists( + os.path.join(wf.base_dir, wf.name, n2.name, 'file1.txt')) + assert os.path.exists( + os.path.join(wf.base_dir, wf.name, n2.name, 'file1.txt')) + + # Test unnecessary outputs + assert os.path.exists( + os.path.join(wf.base_dir, wf.name, n2.name, + 'file2.txt')) is not remove_unnecessary_outputs + + # Test keep_inputs + assert os.path.exists( + os.path.join(wf.base_dir, wf.name, n4.name, + 'file1.txt')) is keep_inputs diff --git a/nipype/pipeline/engine/utils.py b/nipype/pipeline/engine/utils.py index b2bb7f7c75..d42cc1cbe6 100644 --- a/nipype/pipeline/engine/utils.py +++ b/nipype/pipeline/engine/utils.py @@ -1015,10 +1015,8 @@ def make_field_func(*pair): # collect the subnodes to expand subnodes = [s for s in dfs_preorder(graph_in, inode)] - prior_prefix = [] - for s in subnodes: - prior_prefix.extend(re.findall('\.(.)I', s._id)) - prior_prefix = sorted(prior_prefix) + prior_prefix = [re.findall(r'\.(.)I', s._id) for s in subnodes if s._id] + prior_prefix = sorted([l for item in prior_prefix for l in item]) if not prior_prefix: iterable_prefix = 'a' else: @@ -1029,7 +1027,7 @@ def make_field_func(*pair): logger.debug(('subnodes:', subnodes)) # append a suffix to the iterable node id - inode._id += ('.' + iterable_prefix + 'I') + inode._id += '.%sI' % iterable_prefix # merge the iterated subgraphs # dj: the behaviour of .copy changes in version 2 diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index b1f7e4a173..c6a9047337 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -66,7 +66,6 @@ def __init__(self, name, base_dir=None): """ super(Workflow, self).__init__(name, base_dir) self._graph = nx.DiGraph() - self.config = deepcopy(config._sections) # PUBLIC API def clone(self, name): diff --git a/nipype/pipeline/plugins/base.py b/nipype/pipeline/plugins/base.py index f37bcff883..c757c3859c 100644 --- a/nipype/pipeline/plugins/base.py +++ b/nipype/pipeline/plugins/base.py @@ -284,7 +284,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): if len(jobids) > 0: # send all available jobs - logger.info('Pending[%d] Submitting[%d] jobs Slots[%d]', + logger.info('Pending[%d] Submitting[%d] jobs Slots[%s]', num_jobs, len(jobids[:slots]), slots or 'inf') for jobid in jobids[:slots]: diff --git a/nipype/workflows/smri/freesurfer/recon.py b/nipype/workflows/smri/freesurfer/recon.py index ff0c122ede..05018fd931 100644 --- a/nipype/workflows/smri/freesurfer/recon.py +++ b/nipype/workflows/smri/freesurfer/recon.py @@ -10,7 +10,9 @@ from ....interfaces.freesurfer import AddXFormToHeader, Info from ....interfaces.io import DataSink from .utils import getdefaultconfig -from ....pipeline.engine.base import logger +from .... import logging + +logger = logging.getLogger('workflow') def create_skullstripped_recon_flow(name="skullstripped_recon_all"):