Skip to content

Commit dccc3f5

Browse files
committed
RF: Futures-based MultiProc
1 parent 704b97d commit dccc3f5

File tree

1 file changed

+11
-42
lines changed

1 file changed

+11
-42
lines changed

nipype/pipeline/plugins/multiproc.py

Lines changed: 11 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111

1212
# Import packages
1313
import os
14-
from multiprocessing import Process, Pool, cpu_count, pool
14+
from multiprocessing import cpu_count
15+
from concurrent.futures import ProcessPoolExecutor
1516
from traceback import format_exception
1617
import sys
1718
from logging import INFO
@@ -74,25 +75,6 @@ def run_node(node, updatehash, taskid):
7475
return result
7576

7677

77-
class NonDaemonProcess(Process):
78-
"""A non-daemon process to support internal multiprocessing.
79-
"""
80-
81-
def _get_daemon(self):
82-
return False
83-
84-
def _set_daemon(self, value):
85-
pass
86-
87-
daemon = property(_get_daemon, _set_daemon)
88-
89-
90-
class NonDaemonPool(pool.Pool):
91-
"""A process pool with non-daemon processes.
92-
"""
93-
Process = NonDaemonProcess
94-
95-
9678
class MultiProcPlugin(DistributedPluginBase):
9779
"""
9880
Execute workflow with multiprocessing, not sending more jobs at once
@@ -139,8 +121,6 @@ def __init__(self, plugin_args=None):
139121
self._cwd = os.getcwd()
140122

141123
# Read in options or set defaults.
142-
non_daemon = self.plugin_args.get('non_daemon', True)
143-
maxtasks = self.plugin_args.get('maxtasksperchild', 10)
144124
self.processors = self.plugin_args.get('n_procs', cpu_count())
145125
self.memory_gb = self.plugin_args.get(
146126
'memory_gb', # Allocate 90% of system memory
@@ -149,30 +129,19 @@ def __init__(self, plugin_args=None):
149129
True)
150130

151131
# Instantiate different thread pools for non-daemon processes
152-
logger.debug('[MultiProc] Starting in "%sdaemon" mode (n_procs=%d, '
153-
'mem_gb=%0.2f, cwd=%s)', 'non' * int(non_daemon),
132+
logger.debug('[MultiProc] Starting (n_procs=%d, '
133+
'mem_gb=%0.2f, cwd=%s)',
154134
self.processors, self.memory_gb, self._cwd)
155135

156-
NipypePool = NonDaemonPool if non_daemon else Pool
157-
try:
158-
self.pool = NipypePool(
159-
processes=self.processors,
160-
maxtasksperchild=maxtasks,
161-
initializer=os.chdir,
162-
initargs=(self._cwd,)
163-
)
164-
except TypeError:
165-
# Python < 3.2 does not have maxtasksperchild
166-
# When maxtasksperchild is not set, initializer is not to be
167-
# called
168-
self.pool = NipypePool(processes=self.processors)
136+
self.pool = ProcessPoolExecutor(max_workers=self.processors)
169137

170138
self._stats = None
171139

172140
def _async_callback(self, args):
173141
# Make sure runtime is not left at a dubious working directory
174142
os.chdir(self._cwd)
175-
self._taskresult[args['taskid']] = args
143+
result = args.result()
144+
self._taskresult[result['taskid']] = result
176145

177146
def _get_result(self, taskid):
178147
return self._taskresult.get(taskid)
@@ -187,9 +156,9 @@ def _submit_job(self, node, updatehash=False):
187156
if getattr(node.interface, 'terminal_output', '') == 'stream':
188157
node.interface.terminal_output = 'allatonce'
189158

190-
self._task_obj[self._taskid] = self.pool.apply_async(
191-
run_node, (node, updatehash, self._taskid),
192-
callback=self._async_callback)
159+
result_future = self.pool.submit(run_node, node, updatehash, self._taskid)
160+
result_future.add_done_callback(self._async_callback)
161+
self._task_obj[self._taskid] = result_future
193162

194163
logger.debug('[MultiProc] Submitted task %s (taskid=%d).',
195164
node.fullname, self._taskid)
@@ -218,7 +187,7 @@ def _prerun_check(self, graph):
218187
raise RuntimeError('Insufficient resources available for job')
219188

220189
def _postrun_check(self):
221-
self.pool.close()
190+
self.pool.shutdown()
222191

223192
def _check_resources(self, running_tasks):
224193
"""

0 commit comments

Comments
 (0)