Skip to content

[ENH] Delay crashing if exception is raised in local hash check #2410

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 5, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 34 additions & 18 deletions nipype/pipeline/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
absolute_import)
from builtins import range, object, open

import sys
from copy import deepcopy
from glob import glob
import os
import shutil
import sys
from time import sleep, time
from traceback import format_exc
from traceback import format_exception

import numpy as np
import scipy.sparse as ssp
Expand Down Expand Up @@ -159,7 +159,7 @@ def run(self, graph, config, updatehash=False):
graph,
result={
'result': None,
'traceback': format_exc()
'traceback': '\n'.join(format_exception(*sys.exc_info()))
}))
else:
if result:
Expand Down Expand Up @@ -244,7 +244,7 @@ def _submit_mapnode(self, jobid):
mapnodesubids = self.procs[jobid].get_subnodes()
numnodes = len(mapnodesubids)
logger.debug('Adding %d jobs for mapnode %s', numnodes,
self.procs[jobid]._id)
self.procs[jobid])
for i in range(numnodes):
self.mapnodesubids[self.depidx.shape[0] + i] = jobid
self.procs.extend(mapnodesubids)
Expand Down Expand Up @@ -274,7 +274,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
slots = None
else:
slots = max(0, self.max_jobs - num_jobs)
logger.debug('Slots available: %s' % slots)
logger.debug('Slots available: %s', slots)
if (num_jobs >= self.max_jobs) or (slots == 0):
break

Expand Down Expand Up @@ -303,14 +303,14 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
self.proc_done[jobid] = True
self.proc_pending[jobid] = True
# Send job to task manager and add to pending tasks
logger.info('Submitting: %s ID: %d' %
(self.procs[jobid]._id, jobid))
logger.info('Submitting: %s ID: %d',
self.procs[jobid], jobid)
if self._status_callback:
self._status_callback(self.procs[jobid], 'start')

if not self._local_hash_check(jobid, graph):
if self.procs[jobid].run_without_submitting:
logger.debug('Running node %s on master thread' %
logger.debug('Running node %s on master thread',
self.procs[jobid])
try:
self.procs[jobid].run()
Expand All @@ -327,8 +327,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
self.proc_pending[jobid] = False
else:
self.pending_tasks.insert(0, (tid, jobid))
logger.info('Finished submitting: %s ID: %d' %
(self.procs[jobid]._id, jobid))
logger.info('Finished submitting: %s ID: %d',
self.procs[jobid], jobid)
else:
break

Expand All @@ -337,22 +337,38 @@ def _local_hash_check(self, jobid, graph):
self.procs[jobid].config['execution']['local_hash_check']):
return False

cached, updated = self.procs[jobid].is_cached()
try:
cached, updated = self.procs[jobid].is_cached()
except Exception:
logger.warning(
'Error while checking node hash, forcing re-run. '
'Although this error may not prevent the workflow from running, '
'it could indicate a major problem. Please report a new issue '
'at https://github.com/nipy/nipype/issues adding the following '
'information:\n\n\tNode: %s\n\tInterface: %s.%s\n\tTraceback:\n%s',
self.procs[jobid],
self.procs[jobid].interface.__module__,
self.procs[jobid].interface.__class__.__name__,
'\n'.join(format_exception(*sys.exc_info()))
)
return False

logger.debug('Checking hash "%s" locally: cached=%s, updated=%s.',
self.procs[jobid].fullname, cached, updated)
self.procs[jobid], cached, updated)
overwrite = self.procs[jobid].overwrite
always_run = self.procs[jobid]._interface.always_run
always_run = self.procs[jobid].interface.always_run

if cached and updated and (overwrite is False or
overwrite is None and not always_run):
logger.debug('Skipping cached node %s with ID %s.',
self.procs[jobid]._id, jobid)
self.procs[jobid], jobid)
try:
self._task_finished_cb(jobid, cached=True)
self._remove_node_dirs()
except Exception:
logger.debug('Error skipping cached node %s (%s).',
self.procs[jobid]._id, jobid)
logger.debug('Error skipping cached node %s (%s).\n\n%s',
self.procs[jobid], jobid,
'\n'.join(format_exception(*sys.exc_info())))
self._clean_queue(jobid, graph)
self.proc_pending[jobid] = False
return True
Expand All @@ -364,7 +380,7 @@ def _task_finished_cb(self, jobid, cached=False):
This is called when a job is completed.
"""
logger.info('[Job %d] %s (%s).', jobid, 'Cached'
if cached else 'Completed', self.procs[jobid].fullname)
if cached else 'Completed', self.procs[jobid])
if self._status_callback:
self._status_callback(self.procs[jobid], 'end')
# Update job and worker queues
Expand Down Expand Up @@ -481,7 +497,7 @@ def _get_result(self, taskid):
taskid, timeout, node_dir))
raise IOError(error_message)
except IOError as e:
result_data['traceback'] = format_exc()
result_data['traceback'] = '\n'.join(format_exception(*sys.exc_info()))
else:
results_file = glob(os.path.join(node_dir, 'result_*.pklz'))[0]
result_data = loadpkl(results_file)
Expand Down