Skip to content

Commit fa866eb

Browse files
authored
Merge pull request #1756 from ccraddock/master
FIX MultiProc deadlock - thank you @ccraddock closes #1692
2 parents dbfa85a + e1d8ec9 commit fa866eb

File tree

8 files changed

+59
-55
lines changed

8 files changed

+59
-55
lines changed

circle.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@ test:
4949
- docker run -v /etc/localtime:/etc/localtime:ro -v ~/examples:/root/examples:ro -v ~/scratch:/scratch -w /scratch nipype/nipype_test:py35 /usr/bin/run_examples.sh fmri_spm_dartel Linear /root/examples/ l2pipeline :
5050
timeout: 1600
5151
- docker run -v /etc/localtime:/etc/localtime:ro -v ~/examples:/root/examples:ro -v ~/scratch:/scratch -w /scratch nipype/nipype_test:py35 /usr/bin/run_examples.sh fmri_fsl_reuse Linear /root/examples/ level1_workflow
52-
# Disabled until https://github.com/nipy/nipype/issues/1692 is resolved
53-
# - docker run -v /etc/localtime:/etc/localtime:ro -e NIPYPE_NUMBER_OF_CPUS=4 -v ~/examples:/root/examples:ro -v ~/scratch:/scratch -w /scratch nipype/nipype_test:py27 /usr/bin/run_examples.sh fmri_spm_nested MultiProc /root/examples/ level1
52+
- docker run -v /etc/localtime:/etc/localtime:ro -e NIPYPE_NUMBER_OF_CPUS=4 -v ~/examples:/root/examples:ro -v ~/scratch:/scratch -w /scratch nipype/nipype_test:py27 /usr/bin/run_examples.sh fmri_spm_nested MultiProc /root/examples/ level1
5453
- docker run -v /etc/localtime:/etc/localtime:ro -e NIPYPE_NUMBER_OF_CPUS=4 -v ~/examples:/root/examples:ro -v ~/scratch:/scratch -w /scratch nipype/nipype_test:py35 /usr/bin/run_examples.sh fmri_spm_nested MultiProc /root/examples/ level1
5554
- docker run -v /etc/localtime:/etc/localtime:ro -e NIPYPE_NUMBER_OF_CPUS=4 -v ~/examples:/root/examples:ro -v ~/scratch:/scratch -w /scratch nipype/nipype_test:py35 /usr/bin/run_examples.sh fmri_spm_nested MultiProc /root/examples/ l2pipeline
5655

nipype/interfaces/tests/test_runtime_profiler.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,6 @@ def _use_gb_ram(num_gb):
119119

120120

121121
# Test case for the run function
122-
@pytest.mark.skipif(sys.version_info < (3, 0),
123-
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
124122
class TestRuntimeProfiler():
125123
'''
126124
This class is a test case for the runtime profiler

nipype/pipeline/engine/tests/test_engine.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -626,8 +626,6 @@ def func1(in1):
626626
assert not error_raised
627627

628628

629-
@pytest.mark.skipif(sys.version_info < (3, 0),
630-
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
631629
def test_serial_input(tmpdir):
632630
wd = str(tmpdir)
633631
os.chdir(wd)

nipype/pipeline/plugins/base.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,11 +272,17 @@ def run(self, graph, config, updatehash=False):
272272
self._remove_node_dirs()
273273
report_nodes_not_run(notrun)
274274

275-
275+
# close any open resources
276+
self._close()
276277

277278
def _wait(self):
278279
sleep(float(self._config['execution']['poll_sleep_duration']))
279280

281+
def _close(self):
282+
# close any open resources, this could raise NotImplementedError
283+
# but I didn't want to break other plugins
284+
return True
285+
280286
def _get_result(self, taskid):
281287
raise NotImplementedError
282288

nipype/pipeline/plugins/multiproc.py

Lines changed: 51 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
# Import packages
1313
from multiprocessing import Process, Pool, cpu_count, pool
14+
import threading
1415
from traceback import format_exception
1516
import sys
1617

@@ -20,14 +21,13 @@
2021
from ... import logging, config
2122
from ...utils.misc import str2bool
2223
from ..engine import MapNode
23-
from ..plugins import semaphore_singleton
2424
from .base import (DistributedPluginBase, report_crash)
2525

2626
# Init logger
2727
logger = logging.getLogger('workflow')
2828

2929
# Run node
30-
def run_node(node, updatehash):
30+
def run_node(node, updatehash, taskid):
3131
"""Function to execute node.run(), catch and log any errors and
3232
return the result dictionary
3333
@@ -45,7 +45,7 @@ def run_node(node, updatehash):
4545
"""
4646

4747
# Init variables
48-
result = dict(result=None, traceback=None)
48+
result = dict(result=None, traceback=None, taskid=taskid)
4949

5050
# Try and execute the node via node.run()
5151
try:
@@ -77,10 +77,6 @@ class NonDaemonPool(pool.Pool):
7777
Process = NonDaemonProcess
7878

7979

80-
def release_lock(args):
81-
semaphore_singleton.semaphore.release()
82-
83-
8480
# Get total system RAM
8581
def get_system_total_memory_gb():
8682
"""Function to get the total RAM of the running system in GB
@@ -136,12 +132,18 @@ def __init__(self, plugin_args=None):
136132
# Init variables and instance attributes
137133
super(MultiProcPlugin, self).__init__(plugin_args=plugin_args)
138134
self._taskresult = {}
135+
self._task_obj = {}
139136
self._taskid = 0
140137
non_daemon = True
141138
self.plugin_args = plugin_args
142139
self.processors = cpu_count()
143140
self.memory_gb = get_system_total_memory_gb()*0.9 # 90% of system memory
144141

142+
self._timeout=2.0
143+
self._event = threading.Event()
144+
145+
146+
145147
# Check plugin args
146148
if self.plugin_args:
147149
if 'non_daemon' in self.plugin_args:
@@ -150,6 +152,9 @@ def __init__(self, plugin_args=None):
150152
self.processors = self.plugin_args['n_procs']
151153
if 'memory_gb' in self.plugin_args:
152154
self.memory_gb = self.plugin_args['memory_gb']
155+
156+
logger.debug("MultiProcPlugin starting %d threads in pool"%(self.processors))
157+
153158
# Instantiate different thread pools for non-daemon processes
154159
if non_daemon:
155160
# run the execution using the non-daemon pool subclass
@@ -159,14 +164,23 @@ def __init__(self, plugin_args=None):
159164

160165
def _wait(self):
161166
if len(self.pending_tasks) > 0:
162-
semaphore_singleton.semaphore.acquire()
167+
if self._config['execution']['poll_sleep_duration']:
168+
self._timeout = float(self._config['execution']['poll_sleep_duration'])
169+
sig_received=self._event.wait(self._timeout)
170+
if not sig_received:
171+
logger.debug('MultiProcPlugin timeout before signal received. Deadlock averted??')
172+
self._event.clear()
173+
174+
def _async_callback(self, args):
175+
self._taskresult[args['taskid']]=args
176+
self._event.set()
163177

164178
def _get_result(self, taskid):
165179
if taskid not in self._taskresult:
166-
raise RuntimeError('Multiproc task %d not found' % taskid)
167-
if not self._taskresult[taskid].ready():
168-
return None
169-
return self._taskresult[taskid].get()
180+
result=None
181+
else:
182+
result=self._taskresult[taskid]
183+
return result
170184

171185
def _report_crash(self, node, result=None):
172186
if result and result['traceback']:
@@ -178,36 +192,50 @@ def _report_crash(self, node, result=None):
178192
return report_crash(node)
179193

180194
def _clear_task(self, taskid):
181-
del self._taskresult[taskid]
195+
del self._task_obj[taskid]
182196

183197
def _submit_job(self, node, updatehash=False):
184198
self._taskid += 1
185199
if hasattr(node.inputs, 'terminal_output'):
186200
if node.inputs.terminal_output == 'stream':
187201
node.inputs.terminal_output = 'allatonce'
188202

189-
self._taskresult[self._taskid] = \
203+
self._task_obj[self._taskid] = \
190204
self.pool.apply_async(run_node,
191-
(node, updatehash),
192-
callback=release_lock)
205+
(node, updatehash, self._taskid),
206+
callback=self._async_callback)
193207
return self._taskid
194208

209+
def _close(self):
210+
self.pool.close()
211+
return True
212+
195213
def _send_procs_to_workers(self, updatehash=False, graph=None):
196214
""" Sends jobs to workers when system resources are available.
197215
Check memory (gb) and cores usage before running jobs.
198216
"""
199217
executing_now = []
200218

201219
# Check to see if a job is available
202-
jobids = np.flatnonzero((self.proc_pending == True) & \
220+
currently_running_jobids = np.flatnonzero((self.proc_pending == True) & \
203221
(self.depidx.sum(axis=0) == 0).__array__())
204222

205223
# Check available system resources by summing all threads and memory used
206224
busy_memory_gb = 0
207225
busy_processors = 0
208-
for jobid in jobids:
209-
busy_memory_gb += self.procs[jobid]._interface.estimated_memory_gb
210-
busy_processors += self.procs[jobid]._interface.num_threads
226+
for jobid in currently_running_jobids:
227+
if self.procs[jobid]._interface.estimated_memory_gb <= self.memory_gb and \
228+
self.procs[jobid]._interface.num_threads <= self.processors:
229+
230+
busy_memory_gb += self.procs[jobid]._interface.estimated_memory_gb
231+
busy_processors += self.procs[jobid]._interface.num_threads
232+
233+
else:
234+
raise ValueError("Resources required by jobid %d (%f GB, %d threads)"
235+
"exceed what is available on the system (%f GB, %d threads)"%(jobid,
236+
self.procs[jobid].__interface.estimated_memory_gb,
237+
self.procs[jobid].__interface.num_threads,
238+
self.memory_gb,self.processors))
211239

212240
free_memory_gb = self.memory_gb - busy_memory_gb
213241
free_processors = self.processors - busy_processors
@@ -271,8 +299,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
271299
hash_exists, _, _, _ = self.procs[
272300
jobid].hash_exists()
273301
logger.debug('Hash exists %s' % str(hash_exists))
274-
if (hash_exists and (self.procs[jobid].overwrite == False or \
275-
(self.procs[jobid].overwrite == None and \
302+
if (hash_exists and (self.procs[jobid].overwrite == False or
303+
(self.procs[jobid].overwrite == None and
276304
not self.procs[jobid]._interface.always_run))):
277305
self._task_finished_cb(jobid)
278306
self._remove_node_dirs()
@@ -299,7 +327,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
299327
self._remove_node_dirs()
300328

301329
else:
302-
logger.debug('submitting %s' % str(jobid))
330+
logger.debug('MultiProcPlugin submitting %s' % str(jobid))
303331
tid = self._submit_job(deepcopy(self.procs[jobid]),
304332
updatehash=updatehash)
305333
if tid is None:

nipype/pipeline/plugins/tests/test_callback.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,6 @@ def test_callback_exception(tmpdir):
6464
assert so.statuses[0][1] == 'start'
6565
assert so.statuses[1][1] == 'exception'
6666

67-
68-
@pytest.mark.skipif(sys.version_info < (3, 0),
69-
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
7067
def test_callback_multiproc_normal(tmpdir):
7168
so = Status()
7269
wf = pe.Workflow(name='test', base_dir=str(tmpdir))
@@ -83,9 +80,6 @@ def test_callback_multiproc_normal(tmpdir):
8380
assert so.statuses[0][1] == 'start'
8481
assert so.statuses[1][1] == 'end'
8582

86-
87-
@pytest.mark.skipif(sys.version_info < (3, 0),
88-
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
8983
def test_callback_multiproc_exception(tmpdir):
9084
so = Status()
9185
wf = pe.Workflow(name='test', base_dir=str(tmpdir))

nipype/pipeline/plugins/tests/test_multiproc.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,6 @@ def _list_outputs(self):
3232
outputs['output1'] = [1, self.inputs.input1]
3333
return outputs
3434

35-
36-
@pytest.mark.skipif(sys.version_info < (3, 0),
37-
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
3835
def test_run_multiproc(tmpdir):
3936
os.chdir(str(tmpdir))
4037

@@ -118,9 +115,6 @@ def find_metrics(nodes, last_node):
118115

119116
return total_memory, total_threads
120117

121-
122-
@pytest.mark.skipif(sys.version_info < (3, 0),
123-
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
124118
def test_no_more_memory_than_specified():
125119
LOG_FILENAME = 'callback.log'
126120
my_logger = logging.getLogger('callback')
@@ -179,10 +173,6 @@ def test_no_more_memory_than_specified():
179173

180174
os.remove(LOG_FILENAME)
181175

182-
183-
@pytest.mark.skipif(sys.version_info < (3, 0),
184-
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
185-
@pytest.mark.skipif(nib.runtime_profile == False, reason="runtime_profile=False")
186176
def test_no_more_threads_than_specified():
187177
LOG_FILENAME = 'callback.log'
188178
my_logger = logging.getLogger('callback')

nipype/pipeline/plugins/tests/test_multiproc_nondaemon.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,6 @@ def dummyFunction(filename):
8989

9090
return total
9191

92-
93-
@pytest.mark.skipif(sys.version_info < (3, 0),
94-
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
9592
def run_multiproc_nondaemon_with_flag(nondaemon_flag):
9693
'''
9794
Start a pipe with two nodes using the resource multiproc plugin and
@@ -132,9 +129,6 @@ def run_multiproc_nondaemon_with_flag(nondaemon_flag):
132129
rmtree(temp_dir)
133130
return result
134131

135-
136-
@pytest.mark.skipif(sys.version_info < (3, 0),
137-
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
138132
def test_run_multiproc_nondaemon_false():
139133
'''
140134
This is the entry point for the test. Two times a pipe of several multiprocessing jobs gets
@@ -151,9 +145,6 @@ def test_run_multiproc_nondaemon_false():
151145
shouldHaveFailed = True
152146
assert shouldHaveFailed
153147

154-
155-
@pytest.mark.skipif(sys.version_info < (3, 0),
156-
reason="Disabled until https://github.com/nipy/nipype/issues/1692 is resolved")
157148
def test_run_multiproc_nondaemon_true():
158149
# with nondaemon_flag = True, the execution should succeed
159150
result = run_multiproc_nondaemon_with_flag(True)

0 commit comments

Comments
 (0)