Skip to content

Commit 92f6401

Browse files
committed
[ENH] Migrating resource handler to Node level
This PR: - Sets the default `estimated_memory_gb` to 0 (most of the nodes of a workflow will never reach 1GB) - Makes two new arguments available to creating Nodes (n_procs and mem_gb), that will set those two values in the inner interface (no need to call node.interface.num_threads anymore). - Fixes a small error formating one Exception. TODO list: - Remove estimated_memory_gb and num_threads from the BaseInterface - Find a way to synchronize the Node level num_threads and the inner interface inputs (maybe a new metadata threads=True to identify these inputs?) - Resource handling of MapNodes (check if these changes would affect those).
1 parent d68b929 commit 92f6401

File tree

3 files changed

+14
-7
lines changed

3 files changed

+14
-7
lines changed

nipype/interfaces/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -774,7 +774,7 @@ def __init__(self, from_file=None, **inputs):
774774
self.__class__.__name__)
775775

776776
self.inputs = self.input_spec(**inputs)
777-
self.estimated_memory_gb = 1
777+
self.estimated_memory_gb = 0
778778
self.num_threads = 1
779779

780780
if from_file is not None:

nipype/pipeline/engine/nodes.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ class Node(EngineBase):
7979

8080
def __init__(self, interface, name, iterables=None, itersource=None,
8181
synchronize=False, overwrite=None, needed_outputs=None,
82-
run_without_submitting=False, **kwargs):
82+
run_without_submitting=False, n_procs=1, mem_gb=None,
83+
**kwargs):
8384
"""
8485
Parameters
8586
----------
@@ -168,6 +169,11 @@ def __init__(self, interface, name, iterables=None, itersource=None,
168169
self.input_source = {}
169170
self.needed_outputs = []
170171
self.plugin_args = {}
172+
173+
self._interface.num_threads = n_procs
174+
if mem_gb is not None:
175+
self._interface.estimated_memory_gb = mem_gb
176+
171177
if needed_outputs:
172178
self.needed_outputs = sorted(needed_outputs)
173179
self._got_inputs = False

nipype/pipeline/plugins/multiproc.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -231,11 +231,12 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
231231
busy_processors += self.procs[jobid]._interface.num_threads
232232

233233
else:
234-
raise ValueError("Resources required by jobid %d (%f GB, %d threads)"
235-
"exceed what is available on the system (%f GB, %d threads)"%(jobid,
236-
self.procs[jobid].__interface.estimated_memory_gb,
237-
self.procs[jobid].__interface.num_threads,
238-
self.memory_gb,self.processors))
234+
raise ValueError(
235+
"Resources required by jobid {0} ({3}GB, {4} threads) exceed what is "
236+
"available on the system ({1}GB, {2} threads)".format(
237+
jobid, self.memory_gb, self.processors,
238+
self.procs[jobid]._interface.estimated_memory_gb,
239+
self.procs[jobid]._interface.num_threads))
239240

240241
free_memory_gb = self.memory_gb - busy_memory_gb
241242
free_processors = self.processors - busy_processors

0 commit comments

Comments
 (0)