Skip to content

Commit c7492c8

Browse files
authored
Merge pull request #2410 from oesteban/enh/error-checking-local-cache
[ENH] Delay crashing if exception is raised in local hash check
2 parents b2c7130 + a3722aa commit c7492c8

File tree

1 file changed

+34
-18
lines changed

1 file changed

+34
-18
lines changed

nipype/pipeline/plugins/base.py

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
absolute_import)
88
from builtins import range, object, open
99

10+
import sys
1011
from copy import deepcopy
1112
from glob import glob
1213
import os
1314
import shutil
14-
import sys
1515
from time import sleep, time
16-
from traceback import format_exc
16+
from traceback import format_exception
1717

1818
import numpy as np
1919
import scipy.sparse as ssp
@@ -159,7 +159,7 @@ def run(self, graph, config, updatehash=False):
159159
graph,
160160
result={
161161
'result': None,
162-
'traceback': format_exc()
162+
'traceback': '\n'.join(format_exception(*sys.exc_info()))
163163
}))
164164
else:
165165
if result:
@@ -244,7 +244,7 @@ def _submit_mapnode(self, jobid):
244244
mapnodesubids = self.procs[jobid].get_subnodes()
245245
numnodes = len(mapnodesubids)
246246
logger.debug('Adding %d jobs for mapnode %s', numnodes,
247-
self.procs[jobid]._id)
247+
self.procs[jobid])
248248
for i in range(numnodes):
249249
self.mapnodesubids[self.depidx.shape[0] + i] = jobid
250250
self.procs.extend(mapnodesubids)
@@ -274,7 +274,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
274274
slots = None
275275
else:
276276
slots = max(0, self.max_jobs - num_jobs)
277-
logger.debug('Slots available: %s' % slots)
277+
logger.debug('Slots available: %s', slots)
278278
if (num_jobs >= self.max_jobs) or (slots == 0):
279279
break
280280

@@ -303,14 +303,14 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
303303
self.proc_done[jobid] = True
304304
self.proc_pending[jobid] = True
305305
# Send job to task manager and add to pending tasks
306-
logger.info('Submitting: %s ID: %d' %
307-
(self.procs[jobid]._id, jobid))
306+
logger.info('Submitting: %s ID: %d',
307+
self.procs[jobid], jobid)
308308
if self._status_callback:
309309
self._status_callback(self.procs[jobid], 'start')
310310

311311
if not self._local_hash_check(jobid, graph):
312312
if self.procs[jobid].run_without_submitting:
313-
logger.debug('Running node %s on master thread' %
313+
logger.debug('Running node %s on master thread',
314314
self.procs[jobid])
315315
try:
316316
self.procs[jobid].run()
@@ -327,8 +327,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
327327
self.proc_pending[jobid] = False
328328
else:
329329
self.pending_tasks.insert(0, (tid, jobid))
330-
logger.info('Finished submitting: %s ID: %d' %
331-
(self.procs[jobid]._id, jobid))
330+
logger.info('Finished submitting: %s ID: %d',
331+
self.procs[jobid], jobid)
332332
else:
333333
break
334334

@@ -337,22 +337,38 @@ def _local_hash_check(self, jobid, graph):
337337
self.procs[jobid].config['execution']['local_hash_check']):
338338
return False
339339

340-
cached, updated = self.procs[jobid].is_cached()
340+
try:
341+
cached, updated = self.procs[jobid].is_cached()
342+
except Exception:
343+
logger.warning(
344+
'Error while checking node hash, forcing re-run. '
345+
'Although this error may not prevent the workflow from running, '
346+
'it could indicate a major problem. Please report a new issue '
347+
'at https://github.com/nipy/nipype/issues adding the following '
348+
'information:\n\n\tNode: %s\n\tInterface: %s.%s\n\tTraceback:\n%s',
349+
self.procs[jobid],
350+
self.procs[jobid].interface.__module__,
351+
self.procs[jobid].interface.__class__.__name__,
352+
'\n'.join(format_exception(*sys.exc_info()))
353+
)
354+
return False
355+
341356
logger.debug('Checking hash "%s" locally: cached=%s, updated=%s.',
342-
self.procs[jobid].fullname, cached, updated)
357+
self.procs[jobid], cached, updated)
343358
overwrite = self.procs[jobid].overwrite
344-
always_run = self.procs[jobid]._interface.always_run
359+
always_run = self.procs[jobid].interface.always_run
345360

346361
if cached and updated and (overwrite is False or
347362
overwrite is None and not always_run):
348363
logger.debug('Skipping cached node %s with ID %s.',
349-
self.procs[jobid]._id, jobid)
364+
self.procs[jobid], jobid)
350365
try:
351366
self._task_finished_cb(jobid, cached=True)
352367
self._remove_node_dirs()
353368
except Exception:
354-
logger.debug('Error skipping cached node %s (%s).',
355-
self.procs[jobid]._id, jobid)
369+
logger.debug('Error skipping cached node %s (%s).\n\n%s',
370+
self.procs[jobid], jobid,
371+
'\n'.join(format_exception(*sys.exc_info())))
356372
self._clean_queue(jobid, graph)
357373
self.proc_pending[jobid] = False
358374
return True
@@ -364,7 +380,7 @@ def _task_finished_cb(self, jobid, cached=False):
364380
This is called when a job is completed.
365381
"""
366382
logger.info('[Job %d] %s (%s).', jobid, 'Cached'
367-
if cached else 'Completed', self.procs[jobid].fullname)
383+
if cached else 'Completed', self.procs[jobid])
368384
if self._status_callback:
369385
self._status_callback(self.procs[jobid], 'end')
370386
# Update job and worker queues
@@ -481,7 +497,7 @@ def _get_result(self, taskid):
481497
taskid, timeout, node_dir))
482498
raise IOError(error_message)
483499
except IOError as e:
484-
result_data['traceback'] = format_exc()
500+
result_data['traceback'] = '\n'.join(format_exception(*sys.exc_info()))
485501
else:
486502
results_file = glob(os.path.join(node_dir, 'result_*.pklz'))[0]
487503
result_data = loadpkl(results_file)

0 commit comments

Comments
 (0)