From 92f6401258eae11e6a0086fcd34652f33197c550 Mon Sep 17 00:00:00 2001 From: oesteban Date: Thu, 6 Apr 2017 11:33:01 -0700 Subject: [PATCH 1/2] [ENH] Migrating resource handler to Node level This PR: - Sets the default `estimated_memory_gb` to 0 (most of the nodes of a workflow will never reach 1GB) - Makes two new arguments available to creating Nodes (n_procs and mem_gb), that will set those two values in the inner interface (no need to call node.interface.num_threads anymore). - Fixes a small error formating one Exception. TODO list: - Remove estimated_memory_gb and num_threads from the BaseInterface - Find a way to synchronize the Node level num_threads and the inner interface inputs (maybe a new metadata threads=True to identify these inputs?) - Resource handling of MapNodes (check if these changes would affect those). --- nipype/interfaces/base.py | 2 +- nipype/pipeline/engine/nodes.py | 8 +++++++- nipype/pipeline/plugins/multiproc.py | 11 ++++++----- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/nipype/interfaces/base.py b/nipype/interfaces/base.py index 62ea7851ac..f9a78043a2 100644 --- a/nipype/interfaces/base.py +++ b/nipype/interfaces/base.py @@ -774,7 +774,7 @@ def __init__(self, from_file=None, **inputs): self.__class__.__name__) self.inputs = self.input_spec(**inputs) - self.estimated_memory_gb = 1 + self.estimated_memory_gb = 0 self.num_threads = 1 if from_file is not None: diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index 71331d38f5..cbfa70cebb 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -79,7 +79,8 @@ class Node(EngineBase): def __init__(self, interface, name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, - run_without_submitting=False, **kwargs): + run_without_submitting=False, n_procs=1, mem_gb=None, + **kwargs): """ Parameters ---------- @@ -168,6 +169,11 @@ def __init__(self, interface, name, iterables=None, itersource=None, self.input_source = {} self.needed_outputs = [] self.plugin_args = {} + + self._interface.num_threads = n_procs + if mem_gb is not None: + self._interface.estimated_memory_gb = mem_gb + if needed_outputs: self.needed_outputs = sorted(needed_outputs) self._got_inputs = False diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 0465b4f880..3994f2e1cd 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -231,11 +231,12 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): busy_processors += self.procs[jobid]._interface.num_threads else: - raise ValueError("Resources required by jobid %d (%f GB, %d threads)" - "exceed what is available on the system (%f GB, %d threads)"%(jobid, - self.procs[jobid].__interface.estimated_memory_gb, - self.procs[jobid].__interface.num_threads, - self.memory_gb,self.processors)) + raise ValueError( + "Resources required by jobid {0} ({3}GB, {4} threads) exceed what is " + "available on the system ({1}GB, {2} threads)".format( + jobid, self.memory_gb, self.processors, + self.procs[jobid]._interface.estimated_memory_gb, + self.procs[jobid]._interface.num_threads)) free_memory_gb = self.memory_gb - busy_memory_gb free_processors = self.processors - busy_processors From b8120312b323d4ac06ffb3483f452d76729a3146 Mon Sep 17 00:00:00 2001 From: oesteban Date: Thu, 6 Apr 2017 11:58:05 -0700 Subject: [PATCH 2/2] set memory minimum to 0.25 GB --- nipype/interfaces/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nipype/interfaces/base.py b/nipype/interfaces/base.py index f9a78043a2..b12289ea2f 100644 --- a/nipype/interfaces/base.py +++ b/nipype/interfaces/base.py @@ -774,7 +774,7 @@ def __init__(self, from_file=None, **inputs): self.__class__.__name__) self.inputs = self.input_spec(**inputs) - self.estimated_memory_gb = 0 + self.estimated_memory_gb = 0.25 self.num_threads = 1 if from_file is not None: