@@ -122,13 +122,16 @@ def __init__(self, plugin_args=None):
122
122
non_daemon = self .plugin_args .get ('non_daemon' , True )
123
123
maxtasks = self .plugin_args .get ('maxtasksperchild' , 10 )
124
124
self .processors = self .plugin_args .get ('n_procs' , cpu_count ())
125
- self .memory_gb = self .plugin_args .get ('memory_gb' , # Allocate 90% of system memory
126
- get_system_total_memory_gb () * 0.9 )
127
- self .raise_insufficient = self .plugin_args .get ('raise_insufficient' , True )
125
+ self .memory_gb = self .plugin_args .get (
126
+ 'memory_gb' , # Allocate 90% of system memory
127
+ get_system_total_memory_gb () * 0.9 )
128
+ self .raise_insufficient = self .plugin_args .get ('raise_insufficient' ,
129
+ True )
128
130
129
131
# Instantiate different thread pools for non-daemon processes
130
- logger .debug ('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)' ,
131
- 'non' * int (non_daemon ), self .processors , self .memory_gb )
132
+ logger .debug ('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d,'
133
+ 'mem_gb=%0.2f)' , 'non' * int (non_daemon ), self .processors ,
134
+ self .memory_gb )
132
135
133
136
NipypePool = NonDaemonPool if non_daemon else Pool
134
137
try :
@@ -205,12 +208,13 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
205
208
Sends jobs to workers when system resources are available.
206
209
"""
207
210
208
- # Check to see if a job is available (jobs without dependencies not run)
211
+ # Check to see if a job is available (jobs with all dependencies run)
209
212
# See https://github.com/nipy/nipype/pull/2200#discussion_r141605722
210
213
jobids = np .nonzero (~ self .proc_done & (self .depidx .sum (0 ) == 0 ))[1 ]
211
214
212
- # Check available system resources by summing all threads and memory used
213
- free_memory_gb , free_processors = self ._check_resources (self .pending_tasks )
215
+ # Check available resources by summing all threads and memory used
216
+ free_memory_gb , free_processors = self ._check_resources (
217
+ self .pending_tasks )
214
218
215
219
stats = (len (self .pending_tasks ), len (jobids ), free_memory_gb ,
216
220
self .memory_gb , free_processors , self .processors )
@@ -229,7 +233,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
229
233
'be submitted to the queue. Potential deadlock' )
230
234
return
231
235
232
- jobids = self ._sort_jobs (jobids , scheduler = self .plugin_args .get ('scheduler' ))
236
+ jobids = self ._sort_jobs (jobids ,
237
+ scheduler = self .plugin_args .get ('scheduler' ))
233
238
234
239
# Run garbage collector before potentially submitting jobs
235
240
gc .collect ()
@@ -265,9 +270,10 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
265
270
266
271
free_memory_gb -= next_job_gb
267
272
free_processors -= next_job_th
268
- logger .debug ('Allocating %s ID=%d (%0.2fGB, %d threads). Free: %0.2fGB, %d threads.' ,
269
- self .procs [jobid ].fullname , jobid , next_job_gb , next_job_th ,
270
- free_memory_gb , free_processors )
273
+ logger .debug ('Allocating %s ID=%d (%0.2fGB, %d threads). Free: '
274
+ '%0.2fGB, %d threads.' , self .procs [jobid ].fullname ,
275
+ jobid , next_job_gb , next_job_th , free_memory_gb ,
276
+ free_processors )
271
277
272
278
# change job status in appropriate queues
273
279
self .proc_done [jobid ] = True
0 commit comments