Skip to content

[ENH] Workflow connect performance #3184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .zenodo.json
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@
"name": "De La Vega, Alejandro",
"orcid": "0000-0001-9062-3778"
},
{
"affiliation": "Charite Universitatsmedizin Berlin, Germany",
"name": "Waller, Lea",
"orcid": "0000-0002-3239-6957"
},
{
"affiliation": "MIT",
"name": "Kaczmarzyk, Jakub",
Expand Down
16 changes: 16 additions & 0 deletions nipype/pipeline/engine/tests/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,22 @@ def test_doubleconnect():
assert "Trying to connect" in str(excinfo.value)


def test_nested_workflow_doubleconnect():
# double input with nested workflows
a = pe.Node(niu.IdentityInterface(fields=["a", "b"]), name="a")
b = pe.Node(niu.IdentityInterface(fields=["a", "b"]), name="b")
c = pe.Node(niu.IdentityInterface(fields=["a", "b"]), name="c")
flow1 = pe.Workflow(name="test1")
flow2 = pe.Workflow(name="test2")
flow3 = pe.Workflow(name="test3")
flow1.add_nodes([b])
flow2.connect(a, "a", flow1, "b.a")
with pytest.raises(Exception) as excinfo:
flow3.connect(c, "a", flow2, "test1.b.a")
assert "Some connections were not found" in str(excinfo.value)
flow3.connect(c, "b", flow2, "test1.b.b")


def test_duplicate_node_check():

wf = pe.Workflow(name="testidentity")
Expand Down
66 changes: 59 additions & 7 deletions nipype/pipeline/engine/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -770,16 +770,68 @@ def _check_nodes(self, nodes):
def _has_attr(self, parameter, subtype="in"):
"""Checks if a parameter is available as an input or output
"""
hierarchy = parameter.split(".")

# Connecting to a workflow needs at least two values,
# the name of the child node and the name of the input/output
if len(hierarchy) < 2:
return False

attrname = hierarchy.pop()
nodename = hierarchy.pop()

def _check_is_already_connected(workflow, node, attrname):
for _, _, d in workflow._graph.in_edges(nbunch=node, data=True):
for cd in d["connect"]:
if attrname == cd[1]:
return False
return True

targetworkflow = self
while hierarchy:
workflowname = hierarchy.pop(0)
workflow = None
for node in targetworkflow._graph.nodes():
if node.name == workflowname:
if isinstance(node, Workflow):
workflow = node
break
if workflow is None:
return False
# Verify input does not already have an incoming connection
# in the hierarchy of workflows
if subtype == "in":
hierattrname = ".".join(hierarchy + [nodename, attrname])
if not _check_is_already_connected(
targetworkflow, workflow, hierattrname):
return False
targetworkflow = workflow

targetnode = None
for node in targetworkflow._graph.nodes():
if node.name == nodename:
if isinstance(node, Workflow):
return False
else:
targetnode = node
break
if targetnode is None:
return False

if subtype == "in":
subobject = self.inputs
if not hasattr(targetnode.inputs, attrname):
return False
else:
subobject = self.outputs
attrlist = parameter.split(".")
cur_out = subobject
for attr in attrlist:
if not hasattr(cur_out, attr):
if not hasattr(targetnode.outputs, attrname):
return False
cur_out = getattr(cur_out, attr)

# Verify input does not already have an incoming connection
# in the target workflow
if subtype == "in":
if not _check_is_already_connected(
targetworkflow, targetnode, attrname):
return False

return True

def _get_parameter_node(self, parameter, subtype="in"):
Expand Down