11
11
12
12
# Import packages
13
13
import os
14
- from multiprocessing import Process , Pool , cpu_count , pool
14
+ from multiprocessing import cpu_count
15
+ from concurrent .futures import ProcessPoolExecutor
15
16
from traceback import format_exception
16
17
import sys
17
18
from logging import INFO
@@ -74,25 +75,6 @@ def run_node(node, updatehash, taskid):
74
75
return result
75
76
76
77
77
- class NonDaemonProcess (Process ):
78
- """A non-daemon process to support internal multiprocessing.
79
- """
80
-
81
- def _get_daemon (self ):
82
- return False
83
-
84
- def _set_daemon (self , value ):
85
- pass
86
-
87
- daemon = property (_get_daemon , _set_daemon )
88
-
89
-
90
- class NonDaemonPool (pool .Pool ):
91
- """A process pool with non-daemon processes.
92
- """
93
- Process = NonDaemonProcess
94
-
95
-
96
78
class MultiProcPlugin (DistributedPluginBase ):
97
79
"""
98
80
Execute workflow with multiprocessing, not sending more jobs at once
@@ -139,8 +121,6 @@ def __init__(self, plugin_args=None):
139
121
self ._cwd = os .getcwd ()
140
122
141
123
# Read in options or set defaults.
142
- non_daemon = self .plugin_args .get ('non_daemon' , True )
143
- maxtasks = self .plugin_args .get ('maxtasksperchild' , 10 )
144
124
self .processors = self .plugin_args .get ('n_procs' , cpu_count ())
145
125
self .memory_gb = self .plugin_args .get (
146
126
'memory_gb' , # Allocate 90% of system memory
@@ -149,30 +129,19 @@ def __init__(self, plugin_args=None):
149
129
True )
150
130
151
131
# Instantiate different thread pools for non-daemon processes
152
- logger .debug ('[MultiProc] Starting in "%sdaemon" mode (n_procs=%d, '
153
- 'mem_gb=%0.2f, cwd=%s)' , 'non' * int ( non_daemon ),
132
+ logger .debug ('[MultiProc] Starting (n_procs=%d, '
133
+ 'mem_gb=%0.2f, cwd=%s)' ,
154
134
self .processors , self .memory_gb , self ._cwd )
155
135
156
- NipypePool = NonDaemonPool if non_daemon else Pool
157
- try :
158
- self .pool = NipypePool (
159
- processes = self .processors ,
160
- maxtasksperchild = maxtasks ,
161
- initializer = os .chdir ,
162
- initargs = (self ._cwd ,)
163
- )
164
- except TypeError :
165
- # Python < 3.2 does not have maxtasksperchild
166
- # When maxtasksperchild is not set, initializer is not to be
167
- # called
168
- self .pool = NipypePool (processes = self .processors )
136
+ self .pool = ProcessPoolExecutor (max_workers = self .processors )
169
137
170
138
self ._stats = None
171
139
172
140
def _async_callback (self , args ):
173
141
# Make sure runtime is not left at a dubious working directory
174
142
os .chdir (self ._cwd )
175
- self ._taskresult [args ['taskid' ]] = args
143
+ result = args .result ()
144
+ self ._taskresult [result ['taskid' ]] = result
176
145
177
146
def _get_result (self , taskid ):
178
147
return self ._taskresult .get (taskid )
@@ -187,9 +156,9 @@ def _submit_job(self, node, updatehash=False):
187
156
if getattr (node .interface , 'terminal_output' , '' ) == 'stream' :
188
157
node .interface .terminal_output = 'allatonce'
189
158
190
- self . _task_obj [ self . _taskid ] = self .pool .apply_async (
191
- run_node , ( node , updatehash , self ._taskid ),
192
- callback = self ._async_callback )
159
+ result_future = self .pool .submit ( run_node , node , updatehash , self . _taskid )
160
+ result_future . add_done_callback ( self ._async_callback )
161
+ self ._task_obj [ self . _taskid ] = result_future
193
162
194
163
logger .debug ('[MultiProc] Submitted task %s (taskid=%d).' ,
195
164
node .fullname , self ._taskid )
@@ -218,7 +187,7 @@ def _prerun_check(self, graph):
218
187
raise RuntimeError ('Insufficient resources available for job' )
219
188
220
189
def _postrun_check (self ):
221
- self .pool .close ()
190
+ self .pool .shutdown ()
222
191
223
192
def _check_resources (self , running_tasks ):
224
193
"""
0 commit comments