diff --git a/.zenodo.json b/.zenodo.json index 4471e71e7a..92c6f3ed17 100644 --- a/.zenodo.json +++ b/.zenodo.json @@ -157,6 +157,11 @@ "name": "De La Vega, Alejandro", "orcid": "0000-0001-9062-3778" }, + { + "affiliation": "Charite Universitatsmedizin Berlin, Germany", + "name": "Waller, Lea", + "orcid": "0000-0002-3239-6957" + }, { "affiliation": "MIT", "name": "Kaczmarzyk, Jakub", diff --git a/nipype/pipeline/engine/tests/test_workflows.py b/nipype/pipeline/engine/tests/test_workflows.py index 75f77525f8..c6170f7ba8 100644 --- a/nipype/pipeline/engine/tests/test_workflows.py +++ b/nipype/pipeline/engine/tests/test_workflows.py @@ -83,6 +83,22 @@ def test_doubleconnect(): assert "Trying to connect" in str(excinfo.value) +def test_nested_workflow_doubleconnect(): + # double input with nested workflows + a = pe.Node(niu.IdentityInterface(fields=["a", "b"]), name="a") + b = pe.Node(niu.IdentityInterface(fields=["a", "b"]), name="b") + c = pe.Node(niu.IdentityInterface(fields=["a", "b"]), name="c") + flow1 = pe.Workflow(name="test1") + flow2 = pe.Workflow(name="test2") + flow3 = pe.Workflow(name="test3") + flow1.add_nodes([b]) + flow2.connect(a, "a", flow1, "b.a") + with pytest.raises(Exception) as excinfo: + flow3.connect(c, "a", flow2, "test1.b.a") + assert "Some connections were not found" in str(excinfo.value) + flow3.connect(c, "b", flow2, "test1.b.b") + + def test_duplicate_node_check(): wf = pe.Workflow(name="testidentity") diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index d1fde0ba32..bc532ddf90 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -770,16 +770,68 @@ def _check_nodes(self, nodes): def _has_attr(self, parameter, subtype="in"): """Checks if a parameter is available as an input or output """ + hierarchy = parameter.split(".") + + # Connecting to a workflow needs at least two values, + # the name of the child node and the name of the input/output + if len(hierarchy) < 2: + return False + + attrname = hierarchy.pop() + nodename = hierarchy.pop() + + def _check_is_already_connected(workflow, node, attrname): + for _, _, d in workflow._graph.in_edges(nbunch=node, data=True): + for cd in d["connect"]: + if attrname == cd[1]: + return False + return True + + targetworkflow = self + while hierarchy: + workflowname = hierarchy.pop(0) + workflow = None + for node in targetworkflow._graph.nodes(): + if node.name == workflowname: + if isinstance(node, Workflow): + workflow = node + break + if workflow is None: + return False + # Verify input does not already have an incoming connection + # in the hierarchy of workflows + if subtype == "in": + hierattrname = ".".join(hierarchy + [nodename, attrname]) + if not _check_is_already_connected( + targetworkflow, workflow, hierattrname): + return False + targetworkflow = workflow + + targetnode = None + for node in targetworkflow._graph.nodes(): + if node.name == nodename: + if isinstance(node, Workflow): + return False + else: + targetnode = node + break + if targetnode is None: + return False + if subtype == "in": - subobject = self.inputs + if not hasattr(targetnode.inputs, attrname): + return False else: - subobject = self.outputs - attrlist = parameter.split(".") - cur_out = subobject - for attr in attrlist: - if not hasattr(cur_out, attr): + if not hasattr(targetnode.outputs, attrname): return False - cur_out = getattr(cur_out, attr) + + # Verify input does not already have an incoming connection + # in the target workflow + if subtype == "in": + if not _check_is_already_connected( + targetworkflow, targetnode, attrname): + return False + return True def _get_parameter_node(self, parameter, subtype="in"):