From 78c792d1aae2d1d1b406f5d0e5dabba990e3e167 Mon Sep 17 00:00:00 2001 From: Lea Waller Date: Sat, 7 Mar 2020 12:23:43 +0100 Subject: [PATCH 1/4] Re-write _check_inputs and _check_outputs to not require full inputs/outputs computation --- nipype/pipeline/engine/workflows.py | 44 ++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 7 deletions(-) diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index d1fde0ba32..13cd611d59 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -770,16 +770,46 @@ def _check_nodes(self, nodes): def _has_attr(self, parameter, subtype="in"): """Checks if a parameter is available as an input or output """ + hierarchy = parameter.split(".") + attrname = hierarchy.pop() + nodename = hierarchy.pop() + + targetworkflow = self + for workflowname in hierarchy: + workflow = None + for node in targetworkflow._graph.nodes(): + if node.name == workflowname: + if isinstance(node, Workflow): + workflow = node + break + if workflow is None: + return False + targetworkflow = workflow + + targetnode = None + for node in targetworkflow._graph.nodes(): + if node.name == nodename: + if isinstance(node, Workflow): + return False + else: + targetnode = node + break + if targetnode is None: + return False + if subtype == "in": - subobject = self.inputs + if not hasattr(node.inputs, attrname): + return False else: - subobject = self.outputs - attrlist = parameter.split(".") - cur_out = subobject - for attr in attrlist: - if not hasattr(cur_out, attr): + if not hasattr(node.outputs, attrname): return False - cur_out = getattr(cur_out, attr) + + if subtype == "in": + for _, _, d in targetworkflow._graph.in_edges(nbunch=targetnode, data=True): + for cd in d["connect"]: + if attrname == cd[1]: + return False + return True def _get_parameter_node(self, parameter, subtype="in"): From 69e107f4da45b4bf3f31814e5b0d1b43e08cf29a Mon Sep 17 00:00:00 2001 From: Lea Waller Date: Mon, 9 Mar 2020 12:24:14 +0100 Subject: [PATCH 2/4] Add my name to contributors --- .zenodo.json | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.zenodo.json b/.zenodo.json index 4471e71e7a..92c6f3ed17 100644 --- a/.zenodo.json +++ b/.zenodo.json @@ -157,6 +157,11 @@ "name": "De La Vega, Alejandro", "orcid": "0000-0001-9062-3778" }, + { + "affiliation": "Charite Universitatsmedizin Berlin, Germany", + "name": "Waller, Lea", + "orcid": "0000-0002-3239-6957" + }, { "affiliation": "MIT", "name": "Kaczmarzyk, Jakub", From 771ab93a0d822ab6c68bf52ea840910ce3a860ae Mon Sep 17 00:00:00 2001 From: Lea Waller Date: Mon, 9 Mar 2020 13:22:56 +0100 Subject: [PATCH 3/4] Apply suggestions from code review --- nipype/pipeline/engine/workflows.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index 13cd611d59..a12c48a5fb 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -798,10 +798,10 @@ def _has_attr(self, parameter, subtype="in"): return False if subtype == "in": - if not hasattr(node.inputs, attrname): + if not hasattr(targetnode.inputs, attrname): return False else: - if not hasattr(node.outputs, attrname): + if not hasattr(targetnode.outputs, attrname): return False if subtype == "in": From 03729829776277c9f3e10da7144590f209705d64 Mon Sep 17 00:00:00 2001 From: Lea Waller Date: Wed, 11 Mar 2020 18:41:55 +0100 Subject: [PATCH 4/4] Fix double connect on nested workflows --- .../pipeline/engine/tests/test_workflows.py | 16 ++++++++++ nipype/pipeline/engine/workflows.py | 32 ++++++++++++++++--- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/nipype/pipeline/engine/tests/test_workflows.py b/nipype/pipeline/engine/tests/test_workflows.py index 75f77525f8..c6170f7ba8 100644 --- a/nipype/pipeline/engine/tests/test_workflows.py +++ b/nipype/pipeline/engine/tests/test_workflows.py @@ -83,6 +83,22 @@ def test_doubleconnect(): assert "Trying to connect" in str(excinfo.value) +def test_nested_workflow_doubleconnect(): + # double input with nested workflows + a = pe.Node(niu.IdentityInterface(fields=["a", "b"]), name="a") + b = pe.Node(niu.IdentityInterface(fields=["a", "b"]), name="b") + c = pe.Node(niu.IdentityInterface(fields=["a", "b"]), name="c") + flow1 = pe.Workflow(name="test1") + flow2 = pe.Workflow(name="test2") + flow3 = pe.Workflow(name="test3") + flow1.add_nodes([b]) + flow2.connect(a, "a", flow1, "b.a") + with pytest.raises(Exception) as excinfo: + flow3.connect(c, "a", flow2, "test1.b.a") + assert "Some connections were not found" in str(excinfo.value) + flow3.connect(c, "b", flow2, "test1.b.b") + + def test_duplicate_node_check(): wf = pe.Workflow(name="testidentity") diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index a12c48a5fb..bc532ddf90 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -771,11 +771,25 @@ def _has_attr(self, parameter, subtype="in"): """Checks if a parameter is available as an input or output """ hierarchy = parameter.split(".") + + # Connecting to a workflow needs at least two values, + # the name of the child node and the name of the input/output + if len(hierarchy) < 2: + return False + attrname = hierarchy.pop() nodename = hierarchy.pop() + def _check_is_already_connected(workflow, node, attrname): + for _, _, d in workflow._graph.in_edges(nbunch=node, data=True): + for cd in d["connect"]: + if attrname == cd[1]: + return False + return True + targetworkflow = self - for workflowname in hierarchy: + while hierarchy: + workflowname = hierarchy.pop(0) workflow = None for node in targetworkflow._graph.nodes(): if node.name == workflowname: @@ -784,6 +798,13 @@ def _has_attr(self, parameter, subtype="in"): break if workflow is None: return False + # Verify input does not already have an incoming connection + # in the hierarchy of workflows + if subtype == "in": + hierattrname = ".".join(hierarchy + [nodename, attrname]) + if not _check_is_already_connected( + targetworkflow, workflow, hierattrname): + return False targetworkflow = workflow targetnode = None @@ -804,11 +825,12 @@ def _has_attr(self, parameter, subtype="in"): if not hasattr(targetnode.outputs, attrname): return False + # Verify input does not already have an incoming connection + # in the target workflow if subtype == "in": - for _, _, d in targetworkflow._graph.in_edges(nbunch=targetnode, data=True): - for cd in d["connect"]: - if attrname == cd[1]: - return False + if not _check_is_already_connected( + targetworkflow, targetnode, attrname): + return False return True