From e2a996e60328ec1397f44505bc5fd3a7cbaf1111 Mon Sep 17 00:00:00 2001 From: Lea Waller Date: Wed, 28 Apr 2021 11:27:46 +0200 Subject: [PATCH 01/11] Improve performance for _has_node - Networkx represents graphs as a dictionary where the nodes are the keys. As such, we can use the built-in __contains__ function for fast lookup - We can also iterate the graphs directly without constructing a NodeView using self._graph.nodes() --- nipype/pipeline/engine/workflows.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index 9b6e60ffaf..30f61a81ae 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -912,10 +912,12 @@ def _get_all_nodes(self): return allnodes def _has_node(self, wanted_node): - for node in self._graph.nodes(): + if wanted_node in self._graph: + return True # best case scenario + for node in self._graph: # iterate otherwise if wanted_node == node: return True - if isinstance(node, Workflow): + if hasattr(node, "_has_node"): # hasattr is faster than isinstance if node._has_node(wanted_node): return True return False From 176cff010b7d30f96f573f620df4fd020f83ba10 Mon Sep 17 00:00:00 2001 From: Lea Waller Date: Wed, 28 Apr 2021 20:14:42 +0200 Subject: [PATCH 02/11] Use set instead of np.unique to avoid sorting - Also avoids casting the list to np.array --- nipype/pipeline/engine/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nipype/pipeline/engine/utils.py b/nipype/pipeline/engine/utils.py index f77f771ea7..3557475ffe 100644 --- a/nipype/pipeline/engine/utils.py +++ b/nipype/pipeline/engine/utils.py @@ -753,7 +753,7 @@ def _merge_graphs( # nodes of the supergraph. supernodes = supergraph.nodes() ids = [n._hierarchy + n._id for n in supernodes] - if len(np.unique(ids)) != len(ids): + if len(set(ids)) != len(ids): # This should trap the problem of miswiring when multiple iterables are # used at the same level. The use of the template below for naming # updates to nodes is the general solution. From 1c789ad95f88e8401c8e19d6f94fe4ac162d4825 Mon Sep 17 00:00:00 2001 From: Lea Waller Date: Thu, 29 Apr 2021 09:41:43 +0200 Subject: [PATCH 03/11] Add a cache for nested workflows - Update every time we connect/disconenct or add/remove a node - Keep track of which nodes are workflows and which are not - As a result, we do not need to iterate all nodes to determine the result of `_has_node`, we can use O(1) set operations --- nipype/pipeline/engine/workflows.py | 44 ++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index 30f61a81ae..69ab287536 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -59,6 +59,9 @@ def __init__(self, name, base_dir=None): super(Workflow, self).__init__(name, base_dir) self._graph = nx.DiGraph() + self._nodes_cache = set() + self._nested_workflows_cache = set() + # PUBLIC API def clone(self, name): """Clone a workflow @@ -269,6 +272,8 @@ def connect(self, *args, **kwargs): "(%s, %s): new edge data: %s", srcnode, destnode, str(edge_data) ) + self._update_node_cache() + def disconnect(self, *args): """Disconnect nodes See the docstring for connect for format. @@ -314,6 +319,8 @@ def disconnect(self, *args): else: self._graph.add_edges_from([(srcnode, dstnode, edge_data)]) + self._update_node_cache() + def add_nodes(self, nodes): """ Add nodes to a workflow @@ -346,6 +353,7 @@ def add_nodes(self, nodes): if node._hierarchy is None: node._hierarchy = self.name self._graph.add_nodes_from(newnodes) + self._update_node_cache() def remove_nodes(self, nodes): """ Remove nodes from a workflow @@ -356,6 +364,7 @@ def remove_nodes(self, nodes): A list of EngineBase-based objects """ self._graph.remove_nodes_from(nodes) + self._update_node_cache() # Input-Output access @property @@ -903,23 +912,32 @@ def _set_node_input(self, node, param, source, sourceinfo): node.set_input(param, deepcopy(newval)) def _get_all_nodes(self): - allnodes = [] - for node in self._graph.nodes(): - if isinstance(node, Workflow): - allnodes.extend(node._get_all_nodes()) - else: - allnodes.append(node) + allnodes = [ + *self._nodes_cache.difference(self._nested_workflows_cache) + ] # all nodes that are not workflows + for node in self._nested_workflows_cache: + allnodes.extend(node._get_all_nodes()) return allnodes + def _update_node_cache(self): + nodes = set(self._graph) + + added_nodes = nodes.difference(self._nodes_cache) + removed_nodes = self._nodes_cache.difference(nodes) + + self._nodes_cache = nodes + self._nested_workflows_cache.difference_update(removed_nodes) + + for node in added_nodes: + if isinstance(node, Workflow): + self._nested_workflows_cache.add(node) + def _has_node(self, wanted_node): - if wanted_node in self._graph: - return True # best case scenario - for node in self._graph: # iterate otherwise - if wanted_node == node: + if wanted_node in self._nodes_cache: + return True + for node in self._nested_workflows_cache: + if node._has_node(wanted_node): return True - if hasattr(node, "_has_node"): # hasattr is faster than isinstance - if node._has_node(wanted_node): - return True return False def _create_flat_graph(self): From bc85f3e7a4c2f9b5f7013938c07b7f3e94f66c58 Mon Sep 17 00:00:00 2001 From: Lea Waller Date: Thu, 29 Apr 2021 09:43:21 +0200 Subject: [PATCH 04/11] Use set for newnodes to make adding faster - Do not need to loop over entries - Faster O(1) __contains__ --- nipype/pipeline/engine/workflows.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index 69ab287536..eb73afc2ee 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -144,7 +144,7 @@ def connect(self, *args, **kwargs): self.disconnect(connection_list) return - newnodes = [] + newnodes = set() for srcnode, destnode, _ in connection_list: if self in [srcnode, destnode]: msg = ( @@ -154,9 +154,9 @@ def connect(self, *args, **kwargs): raise IOError(msg) if (srcnode not in newnodes) and not self._has_node(srcnode): - newnodes.append(srcnode) + newnodes.add(srcnode) if (destnode not in newnodes) and not self._has_node(destnode): - newnodes.append(destnode) + newnodes.add(destnode) if newnodes: self._check_nodes(newnodes) for node in newnodes: From e9fb94e97e9428b108b01df859cf3f9f95c31968 Mon Sep 17 00:00:00 2001 From: Lea Waller Date: Thu, 29 Apr 2021 09:44:08 +0200 Subject: [PATCH 05/11] Use sets for connected_ports - Faster operations for update and contains --- nipype/pipeline/engine/workflows.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index eb73afc2ee..1498b07c55 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -166,15 +166,16 @@ def connect(self, *args, **kwargs): connected_ports = {} for srcnode, destnode, connects in connection_list: if destnode not in connected_ports: - connected_ports[destnode] = [] + connected_ports[destnode] = set() # check to see which ports of destnode are already # connected. if not disconnect and (destnode in self._graph.nodes()): for edge in self._graph.in_edges(destnode): data = self._graph.get_edge_data(*edge) - for sourceinfo, destname in data["connect"]: - if destname not in connected_ports[destnode]: - connected_ports[destnode] += [destname] + connected_ports[destnode].update( + destname + for _, destname in data["connect"] + ) for source, dest in connects: # Currently datasource/sink/grabber.io modules # determine their inputs/outputs depending on @@ -229,7 +230,7 @@ def connect(self, *args, **kwargs): ) if sourcename and not srcnode._check_outputs(sourcename): not_found.append(["out", srcnode.name, sourcename]) - connected_ports[destnode] += [dest] + connected_ports[destnode].add(dest) infostr = [] for info in not_found: infostr += [ From 3fa52a93ed5cbb90ee9603116514972e67769cc3 Mon Sep 17 00:00:00 2001 From: Lea Waller Date: Fri, 30 Apr 2021 09:43:49 +0200 Subject: [PATCH 06/11] Remove topological sort for _generate_flatgraph - The function is only adding/removing nodes and adjusting their connections - As such, there are no serial dependencies, and we can iterate in any order --- nipype/pipeline/engine/workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index 1498b07c55..0a8000bfda 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -970,7 +970,7 @@ def _generate_flatgraph(self): raise Exception( ("Workflow: %s is not a directed acyclic graph " "(DAG)") % self.name ) - nodes = list(nx.topological_sort(self._graph)) + nodes = list(self._graph.nodes) for node in nodes: logger.debug("processing node: %s", node) if isinstance(node, Workflow): From 49685aa8edb05afd2e11f1abb7ce5059404b7ae5 Mon Sep 17 00:00:00 2001 From: Lea Waller Date: Fri, 30 Apr 2021 09:47:26 +0200 Subject: [PATCH 07/11] Remove regular expression re-compiles - The only thing that changes about the regular expression is the prefix - We can also detect the prefix with startswith, and then use the same regular expression across the loop - This means that Python can cache the compiled regex internally, and we save some time --- nipype/pipeline/engine/utils.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/nipype/pipeline/engine/utils.py b/nipype/pipeline/engine/utils.py index 3557475ffe..de769607c2 100644 --- a/nipype/pipeline/engine/utils.py +++ b/nipype/pipeline/engine/utils.py @@ -1100,11 +1100,12 @@ def make_field_func(*pair): old_edge_dict = jedge_dict[jnode] # the edge source node replicates expansions = defaultdict(list) - for node in graph_in.nodes(): + for node in graph_in: for src_id in list(old_edge_dict.keys()): # Drop the original JoinNodes; only concerned with # generated Nodes - if hasattr(node, "joinfield") and node.itername == src_id: + itername = node.itername + if hasattr(node, "joinfield") and itername == src_id: continue # Patterns: # - src_id : Non-iterable node @@ -1113,10 +1114,12 @@ def make_field_func(*pair): # - src_id.[a-z]I.[a-z]\d+ : # Non-IdentityInterface w/ iterables # - src_idJ\d+ : JoinNode(IdentityInterface) - if re.match( - src_id + r"((\.[a-z](I\.[a-z])?|J)\d+)?$", node.itername - ): - expansions[src_id].append(node) + if itername.startswith(src_id): + itername = itername[len(src_id):] + if re.fullmatch( + r"((\.[a-z](I\.[a-z])?|J)\d+)?", itername + ): + expansions[src_id].append(node) for in_id, in_nodes in list(expansions.items()): logger.debug( "The join node %s input %s was expanded" " to %d nodes.", From 54aa6adb7b2670fb7361687e1e83db171cfd407e Mon Sep 17 00:00:00 2001 From: Lea Waller Date: Fri, 30 Apr 2021 17:14:46 +0200 Subject: [PATCH 08/11] Apply suggestions from code review Co-authored-by: Chris Markiewicz --- nipype/pipeline/engine/utils.py | 6 ++---- nipype/pipeline/engine/workflows.py | 6 ++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/nipype/pipeline/engine/utils.py b/nipype/pipeline/engine/utils.py index de769607c2..4e8b6d2e8c 100644 --- a/nipype/pipeline/engine/utils.py +++ b/nipype/pipeline/engine/utils.py @@ -1115,10 +1115,8 @@ def make_field_func(*pair): # Non-IdentityInterface w/ iterables # - src_idJ\d+ : JoinNode(IdentityInterface) if itername.startswith(src_id): - itername = itername[len(src_id):] - if re.fullmatch( - r"((\.[a-z](I\.[a-z])?|J)\d+)?", itername - ): + suffix = itername[len(src_id):] + if re.fullmatch(r"((\.[a-z](I\.[a-z])?|J)\d+)?", suffix): expansions[src_id].append(node) for in_id, in_nodes in list(expansions.items()): logger.debug( diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index 0a8000bfda..30878b9b12 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -913,11 +913,9 @@ def _set_node_input(self, node, param, source, sourceinfo): node.set_input(param, deepcopy(newval)) def _get_all_nodes(self): - allnodes = [ - *self._nodes_cache.difference(self._nested_workflows_cache) - ] # all nodes that are not workflows + allnodes = self._nodes_cache - self._nested_workflows_cache for node in self._nested_workflows_cache: - allnodes.extend(node._get_all_nodes()) + allnodes |= node._get_all_nodes() return allnodes def _update_node_cache(self): From 0b49a70a007d9de7387b63db20dcea43fb263d49 Mon Sep 17 00:00:00 2001 From: Lea Waller Date: Fri, 30 Apr 2021 17:25:50 +0200 Subject: [PATCH 09/11] Apply formatting suggestion from code review --- nipype/pipeline/engine/workflows.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index 30878b9b12..4dd204fba2 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -932,12 +932,13 @@ def _update_node_cache(self): self._nested_workflows_cache.add(node) def _has_node(self, wanted_node): - if wanted_node in self._nodes_cache: - return True - for node in self._nested_workflows_cache: - if node._has_node(wanted_node): - return True - return False + return ( + wanted_node in self._nodes_cache or + any( + wf._has_node(wanted_node) + for wf in self._nested_workflows_cache + ) + ) def _create_flat_graph(self): """Make a simple DAG where no node is a workflow.""" From 672a2340ee09fbb05b808d454f9038548569a312 Mon Sep 17 00:00:00 2001 From: Lea Waller Date: Fri, 30 Apr 2021 18:42:20 +0200 Subject: [PATCH 10/11] Add suggestion from code review for `add_nodes` --- nipype/pipeline/engine/workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index 4dd204fba2..32be25f003 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -333,7 +333,7 @@ def add_nodes(self, nodes): newnodes = [] all_nodes = self._get_all_nodes() for node in nodes: - if self._has_node(node): + if node in all_nodes: raise IOError("Node %s already exists in the workflow" % node) if isinstance(node, Workflow): for subnode in node._get_all_nodes(): From 0c485d3d1a5511421d85e1f4a44794b254932063 Mon Sep 17 00:00:00 2001 From: Lea Waller Date: Fri, 30 Apr 2021 22:05:11 +0200 Subject: [PATCH 11/11] Remove unnecessary calls to `_update_node_cache` - Apply suggestions from code review Co-authored-by: Chris Markiewicz --- nipype/pipeline/engine/workflows.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index 32be25f003..184cfd5a57 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -273,7 +273,8 @@ def connect(self, *args, **kwargs): "(%s, %s): new edge data: %s", srcnode, destnode, str(edge_data) ) - self._update_node_cache() + if newnodes: + self._update_node_cache() def disconnect(self, *args): """Disconnect nodes @@ -320,8 +321,6 @@ def disconnect(self, *args): else: self._graph.add_edges_from([(srcnode, dstnode, edge_data)]) - self._update_node_cache() - def add_nodes(self, nodes): """ Add nodes to a workflow