@@ -129,9 +129,10 @@ def run(self, graph, config, updatehash=False):
129
129
old_presub_stats = None
130
130
while not np .all (self .proc_done ) or np .any (self .proc_pending ):
131
131
loop_start = time ()
132
- # Check to see if a job is available (jobs without dependencies not run)
133
- # See https://github.com/nipy/nipype/pull/2200#discussion_r141605722
134
- jobs_ready = np .nonzero (~ self .proc_done & (self .depidx .sum (0 ) == 0 ))[1 ]
132
+ # Check if a job is available (jobs with all dependencies run)
133
+ # https://github.com/nipy/nipype/pull/2200#discussion_r141605722
134
+ jobs_ready = np .nonzero (~ self .proc_done &
135
+ (self .depidx .sum (0 ) == 0 ))[1 ]
135
136
136
137
progress_stats = (len (self .proc_done ),
137
138
np .sum (self .proc_done ^ self .proc_pending ),
@@ -165,7 +166,8 @@ def run(self, graph, config, updatehash=False):
165
166
self ._remove_node_dirs ()
166
167
self ._clear_task (taskid )
167
168
else :
168
- assert self .proc_done [jobid ] and self .proc_pending [jobid ]
169
+ assert self .proc_done [jobid ] and \
170
+ self .proc_pending [jobid ]
169
171
toappend .insert (0 , (taskid , jobid ))
170
172
171
173
if toappend :
@@ -273,8 +275,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
273
275
if (num_jobs >= self .max_jobs ) or (slots == 0 ):
274
276
break
275
277
276
- # Check to see if a job is available (jobs without dependencies not run)
277
- # See https://github.com/nipy/nipype/pull/2200#discussion_r141605722
278
+ # Check if a job is available (jobs with all dependencies run)
279
+ # https://github.com/nipy/nipype/pull/2200#discussion_r141605722
278
280
jobids = np .nonzero (~ self .proc_done & (self .depidx .sum (0 ) == 0 ))[1 ]
279
281
280
282
if len (jobids ) > 0 :
@@ -327,7 +329,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
327
329
break
328
330
329
331
def _local_hash_check (self , jobid , graph ):
330
- if not str2bool (self .procs [jobid ].config ['execution' ]['local_hash_check' ]):
332
+ if not str2bool (self .procs [jobid ].config ['execution' ][
333
+ 'local_hash_check' ]):
331
334
return False
332
335
333
336
logger .debug ('Checking hash (%d) locally' , jobid )
@@ -398,8 +401,8 @@ def _remove_node_dirs(self):
398
401
"""Removes directories whose outputs have already been used up
399
402
"""
400
403
if str2bool (self ._config ['execution' ]['remove_node_directories' ]):
401
- for idx in np .nonzero (
402
- ( self . refidx . sum ( axis = 1 ) == 0 ). __array__ ())[ 0 ] :
404
+ indices = np .nonzero (( self . refidx . sum ( axis = 1 ) == 0 ). __array__ ())[ 0 ]
405
+ for idx in indices :
403
406
if idx in self .mapnodesubids :
404
407
continue
405
408
if self .proc_done [idx ] and (not self .proc_pending [idx ]):
@@ -514,7 +517,8 @@ class GraphPluginBase(PluginBase):
514
517
515
518
def __init__ (self , plugin_args = None ):
516
519
if plugin_args and plugin_args .get ('status_callback' ):
517
- logger .warning ('status_callback not supported for Graph submission plugins' )
520
+ logger .warning ('status_callback not supported for Graph submission'
521
+ ' plugins' )
518
522
super (GraphPluginBase , self ).__init__ (plugin_args = plugin_args )
519
523
520
524
def run (self , graph , config , updatehash = False ):
0 commit comments