diff --git a/nipype/pipeline/plugins/base.py b/nipype/pipeline/plugins/base.py index d7b7d64179..ff84937bc6 100644 --- a/nipype/pipeline/plugins/base.py +++ b/nipype/pipeline/plugins/base.py @@ -7,13 +7,13 @@ absolute_import) from builtins import range, object, open +import sys from copy import deepcopy from glob import glob import os import shutil -import sys from time import sleep, time -from traceback import format_exc +from traceback import format_exception import numpy as np import scipy.sparse as ssp @@ -159,7 +159,7 @@ def run(self, graph, config, updatehash=False): graph, result={ 'result': None, - 'traceback': format_exc() + 'traceback': '\n'.join(format_exception(*sys.exc_info())) })) else: if result: @@ -244,7 +244,7 @@ def _submit_mapnode(self, jobid): mapnodesubids = self.procs[jobid].get_subnodes() numnodes = len(mapnodesubids) logger.debug('Adding %d jobs for mapnode %s', numnodes, - self.procs[jobid]._id) + self.procs[jobid]) for i in range(numnodes): self.mapnodesubids[self.depidx.shape[0] + i] = jobid self.procs.extend(mapnodesubids) @@ -274,7 +274,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): slots = None else: slots = max(0, self.max_jobs - num_jobs) - logger.debug('Slots available: %s' % slots) + logger.debug('Slots available: %s', slots) if (num_jobs >= self.max_jobs) or (slots == 0): break @@ -303,14 +303,14 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): self.proc_done[jobid] = True self.proc_pending[jobid] = True # Send job to task manager and add to pending tasks - logger.info('Submitting: %s ID: %d' % - (self.procs[jobid]._id, jobid)) + logger.info('Submitting: %s ID: %d', + self.procs[jobid], jobid) if self._status_callback: self._status_callback(self.procs[jobid], 'start') if not self._local_hash_check(jobid, graph): if self.procs[jobid].run_without_submitting: - logger.debug('Running node %s on master thread' % + logger.debug('Running node %s on master thread', self.procs[jobid]) try: self.procs[jobid].run() @@ -327,8 +327,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): self.proc_pending[jobid] = False else: self.pending_tasks.insert(0, (tid, jobid)) - logger.info('Finished submitting: %s ID: %d' % - (self.procs[jobid]._id, jobid)) + logger.info('Finished submitting: %s ID: %d', + self.procs[jobid], jobid) else: break @@ -337,22 +337,38 @@ def _local_hash_check(self, jobid, graph): self.procs[jobid].config['execution']['local_hash_check']): return False - cached, updated = self.procs[jobid].is_cached() + try: + cached, updated = self.procs[jobid].is_cached() + except Exception: + logger.warning( + 'Error while checking node hash, forcing re-run. ' + 'Although this error may not prevent the workflow from running, ' + 'it could indicate a major problem. Please report a new issue ' + 'at https://github.com/nipy/nipype/issues adding the following ' + 'information:\n\n\tNode: %s\n\tInterface: %s.%s\n\tTraceback:\n%s', + self.procs[jobid], + self.procs[jobid].interface.__module__, + self.procs[jobid].interface.__class__.__name__, + '\n'.join(format_exception(*sys.exc_info())) + ) + return False + logger.debug('Checking hash "%s" locally: cached=%s, updated=%s.', - self.procs[jobid].fullname, cached, updated) + self.procs[jobid], cached, updated) overwrite = self.procs[jobid].overwrite - always_run = self.procs[jobid]._interface.always_run + always_run = self.procs[jobid].interface.always_run if cached and updated and (overwrite is False or overwrite is None and not always_run): logger.debug('Skipping cached node %s with ID %s.', - self.procs[jobid]._id, jobid) + self.procs[jobid], jobid) try: self._task_finished_cb(jobid, cached=True) self._remove_node_dirs() except Exception: - logger.debug('Error skipping cached node %s (%s).', - self.procs[jobid]._id, jobid) + logger.debug('Error skipping cached node %s (%s).\n\n%s', + self.procs[jobid], jobid, + '\n'.join(format_exception(*sys.exc_info()))) self._clean_queue(jobid, graph) self.proc_pending[jobid] = False return True @@ -364,7 +380,7 @@ def _task_finished_cb(self, jobid, cached=False): This is called when a job is completed. """ logger.info('[Job %d] %s (%s).', jobid, 'Cached' - if cached else 'Completed', self.procs[jobid].fullname) + if cached else 'Completed', self.procs[jobid]) if self._status_callback: self._status_callback(self.procs[jobid], 'end') # Update job and worker queues @@ -481,7 +497,7 @@ def _get_result(self, taskid): taskid, timeout, node_dir)) raise IOError(error_message) except IOError as e: - result_data['traceback'] = format_exc() + result_data['traceback'] = '\n'.join(format_exception(*sys.exc_info())) else: results_file = glob(os.path.join(node_dir, 'result_*.pklz'))[0] result_data = loadpkl(results_file)