Skip to content

Commit 9f3de6d

Browse files
committed
Removed resource_multiproc code so only new_interfaces code is left
1 parent d8119d3 commit 9f3de6d

File tree

15 files changed

+58
-826
lines changed

15 files changed

+58
-826
lines changed

nipype/interfaces/ants/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# -Using -1 gives primary responsibilty to ITKv4 to do the correct
1313
# thread limitings.
1414
# -Using 1 takes a very conservative approach to avoid overloading
15-
# the computer (when running ResourceMultiProc) by forcing everything to
15+
# the computer (when running MultiProc) by forcing everything to
1616
# single threaded. This can be a severe penalty for registration
1717
# performance.
1818
LOCAL_DEFAULT_NUMBER_OF_THREADS = 1

nipype/interfaces/base.py

Lines changed: 2 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -754,8 +754,6 @@ def __init__(self, **inputs):
754754
raise Exception('No input_spec in class: %s' %
755755
self.__class__.__name__)
756756
self.inputs = self.input_spec(**inputs)
757-
self.estimated_memory = 1
758-
self.num_threads = 1
759757

760758
@classmethod
761759
def help(cls, returnhelp=False):
@@ -1194,69 +1192,14 @@ def _read(self, drain):
11941192
self._lastidx = len(self._rows)
11951193

11961194

1197-
# Get number of threads for process
1198-
def _get_num_threads(proc):
1199-
'''
1200-
'''
1201-
1202-
# Import packages
1203-
import psutil
1204-
import logging as lg
1205-
1206-
# Init variables
1207-
num_threads = proc.num_threads()
1208-
try:
1209-
num_children = len(proc.children())
1210-
for child in proc.children():
1211-
num_threads = max(num_threads, num_children,
1212-
child.num_threads(), len(child.children()))
1213-
except psutil.NoSuchProcess:
1214-
pass
1215-
1216-
return num_threads
1217-
1218-
1219-
# Get max resources used for process
1220-
def _get_max_resources_used(proc, mem_mb, num_threads, poll=False):
1221-
'''
1222-
docstring
1223-
'''
1224-
1225-
# Import packages
1226-
from memory_profiler import _get_memory
1227-
import psutil
1228-
1229-
try:
1230-
mem_mb = max(mem_mb, _get_memory(proc.pid, include_children=True))
1231-
num_threads = max(num_threads, _get_num_threads(psutil.Process(proc.pid)))
1232-
if poll:
1233-
proc.poll()
1234-
except Exception as exc:
1235-
iflogger.info('Could not get resources used by process. Error: %s'\
1236-
% exc)
1237-
1238-
# Return resources
1239-
return mem_mb, num_threads
1240-
1241-
12421195
def run_command(runtime, output=None, timeout=0.01, redirect_x=False):
12431196
"""Run a command, read stdout and stderr, prefix with timestamp.
12441197
12451198
The returned runtime contains a merged stdout+stderr log with timestamps
12461199
"""
1247-
1248-
# Import packages
1249-
try:
1250-
import memory_profiler
1251-
import psutil
1252-
mem_prof = True
1253-
except:
1254-
mem_prof = False
1255-
1256-
# Init variables
12571200
PIPE = subprocess.PIPE
1258-
cmdline = runtime.cmdline
12591201

1202+
cmdline = runtime.cmdline
12601203
if redirect_x:
12611204
exist_xvfb, _ = _exists_in_path('xvfb-run', runtime.environ)
12621205
if not exist_xvfb:
@@ -1288,12 +1231,6 @@ def run_command(runtime, output=None, timeout=0.01, redirect_x=False):
12881231
result = {}
12891232
errfile = os.path.join(runtime.cwd, 'stderr.nipype')
12901233
outfile = os.path.join(runtime.cwd, 'stdout.nipype')
1291-
1292-
# Init variables for memory profiling
1293-
mem_mb = -1
1294-
num_threads = -1
1295-
interval = 1
1296-
12971234
if output == 'stream':
12981235
streams = [Stream('stdout', proc.stdout), Stream('stderr', proc.stderr)]
12991236

@@ -1309,10 +1246,8 @@ def _process(drain=0):
13091246
else:
13101247
for stream in res[0]:
13111248
stream.read(drain)
1249+
13121250
while proc.returncode is None:
1313-
if mem_prof:
1314-
mem_mb, num_threads = \
1315-
_get_max_resources_used(proc, mem_mb, num_threads)
13161251
proc.poll()
13171252
_process()
13181253
_process(drain=1)
@@ -1326,41 +1261,25 @@ def _process(drain=0):
13261261
result[stream._name] = [r[2] for r in rows]
13271262
temp.sort()
13281263
result['merged'] = [r[1] for r in temp]
1329-
13301264
if output == 'allatonce':
1331-
if mem_prof:
1332-
while proc.returncode is None:
1333-
mem_mb, num_threads = \
1334-
_get_max_resources_used(proc, mem_mb, num_threads, poll=True)
13351265
stdout, stderr = proc.communicate()
13361266
stdout = stdout.decode(default_encoding)
13371267
stderr = stderr.decode(default_encoding)
13381268
result['stdout'] = stdout.split('\n')
13391269
result['stderr'] = stderr.split('\n')
13401270
result['merged'] = ''
13411271
if output == 'file':
1342-
if mem_prof:
1343-
while proc.returncode is None:
1344-
mem_mb, num_threads = \
1345-
_get_max_resources_used(proc, mem_mb, num_threads, poll=True)
13461272
ret_code = proc.wait()
13471273
stderr.flush()
13481274
stdout.flush()
13491275
result['stdout'] = [line.decode(default_encoding).strip() for line in open(outfile, 'rb').readlines()]
13501276
result['stderr'] = [line.decode(default_encoding).strip() for line in open(errfile, 'rb').readlines()]
13511277
result['merged'] = ''
13521278
if output == 'none':
1353-
if mem_prof:
1354-
while proc.returncode is None:
1355-
mem_mb, num_threads = \
1356-
_get_max_resources_used(proc, mem_mb, num_threads, poll=True)
13571279
proc.communicate()
13581280
result['stdout'] = []
13591281
result['stderr'] = []
13601282
result['merged'] = ''
1361-
1362-
setattr(runtime, 'runtime_memory', mem_mb/1024.0)
1363-
setattr(runtime, 'runtime_threads', num_threads)
13641283
runtime.stderr = '\n'.join(result['stderr'])
13651284
runtime.stdout = '\n'.join(result['stdout'])
13661285
runtime.merged = result['merged']

nipype/pipeline/engine/nodes.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252

5353
from ... import config, logging
5454
logger = logging.getLogger('workflow')
55-
5655
from ...interfaces.base import (traits, InputMultiPath, CommandLine,
5756
Undefined, TraitedSpec, DynamicTraitedSpec,
5857
Bunch, InterfaceResult, md5, Interface,
@@ -1152,7 +1151,8 @@ def _node_runner(self, nodes, updatehash=False):
11521151
if str2bool(self.config['execution']['stop_on_first_crash']):
11531152
self._result = node.result
11541153
raise
1155-
yield i, node, err
1154+
finally:
1155+
yield i, node, err
11561156

11571157
def _collate_results(self, nodes):
11581158
self._result = InterfaceResult(interface=[], runtime=[],

nipype/pipeline/engine/tests/test_engine.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -715,15 +715,15 @@ def func1(in1):
715715
w1.config['execution'] = {'stop_on_first_crash': 'true',
716716
'local_hash_check': 'true',
717717
'crashdump_dir': wd,
718-
'poll_sleep_duration' : 2}
718+
'poll_sleep_duration': 2}
719719

720720
# test output of num_subnodes method when serial is default (False)
721721
yield assert_equal, n1.num_subnodes(), len(n1.inputs.in1)
722722

723723
# test running the workflow on default conditions
724724
error_raised = False
725725
try:
726-
w1.run(plugin='ResourceMultiProc')
726+
w1.run(plugin='MultiProc')
727727
except Exception as e:
728728
from nipype.pipeline.engine.base import logger
729729
logger.info('Exception: %s' % str(e))
@@ -737,7 +737,7 @@ def func1(in1):
737737
# test running the workflow on serial conditions
738738
error_raised = False
739739
try:
740-
w1.run(plugin='ResourceMultiProc')
740+
w1.run(plugin='MultiProc')
741741
except Exception as e:
742742
from nipype.pipeline.engine.base import logger
743743
logger.info('Exception: %s' % str(e))

nipype/pipeline/engine/tests/test_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ def test_function3(arg):
214214

215215
out_dir = mkdtemp()
216216

217-
for plugin in ('Linear',): # , 'ResourceMultiProc'):
217+
for plugin in ('Linear',): # , 'MultiProc'):
218218
n1 = pe.Node(niu.Function(input_names=['arg1'],
219219
output_names=['out_file1', 'out_file2', 'dir'],
220220
function=test_function),

nipype/pipeline/plugins/__init__.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,11 @@
99
from .sge import SGEPlugin
1010
from .condor import CondorPlugin
1111
from .dagman import CondorDAGManPlugin
12-
from .multiproc import ResourceMultiProcPlugin
12+
from .multiproc import MultiProcPlugin
1313
from .ipython import IPythonPlugin
1414
from .somaflow import SomaFlowPlugin
1515
from .pbsgraph import PBSGraphPlugin
1616
from .sgegraph import SGEGraphPlugin
1717
from .lsf import LSFPlugin
1818
from .slurm import SLURMPlugin
1919
from .slurmgraph import SLURMGraphPlugin
20-
21-
from .callback_log import log_nodes_cb

nipype/pipeline/plugins/base.py

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import numpy as np
2121
import scipy.sparse as ssp
2222

23+
2324
from ...utils.filemanip import savepkl, loadpkl
2425
from ...utils.misc import str2bool
2526
from ..engine.utils import (nx, dfs_preorder, topological_sort)
@@ -245,7 +246,7 @@ def run(self, graph, config, updatehash=False):
245246
notrun.append(self._clean_queue(jobid, graph,
246247
result=result))
247248
else:
248-
self._task_finished_cb(jobid, result)
249+
self._task_finished_cb(jobid)
249250
self._remove_node_dirs()
250251
self._clear_task(taskid)
251252
else:
@@ -264,15 +265,10 @@ def run(self, graph, config, updatehash=False):
264265
graph=graph)
265266
else:
266267
logger.debug('Not submitting')
267-
self._wait()
268+
sleep(float(self._config['execution']['poll_sleep_duration']))
268269
self._remove_node_dirs()
269270
report_nodes_not_run(notrun)
270271

271-
272-
273-
def _wait(self):
274-
sleep(float(self._config['execution']['poll_sleep_duration']))
275-
276272
def _get_result(self, taskid):
277273
raise NotImplementedError
278274

@@ -414,18 +410,15 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
414410
else:
415411
break
416412

417-
def _task_finished_cb(self, jobid, result=None):
413+
def _task_finished_cb(self, jobid):
418414
""" Extract outputs and assign to inputs of dependent tasks
419415
420416
This is called when a job is completed.
421417
"""
422418
logger.info('[Job finished] jobname: %s jobid: %d' %
423419
(self.procs[jobid]._id, jobid))
424420
if self._status_callback:
425-
if result == None:
426-
if self._taskresult.has_key(jobid):
427-
result = self._taskresult[jobid].get()
428-
self._status_callback(self.procs[jobid], 'end', result)
421+
self._status_callback(self.procs[jobid], 'end')
429422
# Update job and worker queues
430423
self.proc_pending[jobid] = False
431424
# update the job dependency structure

nipype/pipeline/plugins/callback_log.py

Lines changed: 0 additions & 55 deletions
This file was deleted.

0 commit comments

Comments
 (0)