From 8579a3ce29b4eebbdcbd2a18f04f3126a895ac89 Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Fri, 26 Jan 2018 12:55:17 -0800 Subject: [PATCH 1/5] deal reliably with errors while checking hash --- nipype/pipeline/plugins/base.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/nipype/pipeline/plugins/base.py b/nipype/pipeline/plugins/base.py index d7b7d64179..c48eec4353 100644 --- a/nipype/pipeline/plugins/base.py +++ b/nipype/pipeline/plugins/base.py @@ -7,13 +7,13 @@ absolute_import) from builtins import range, object, open +import sys from copy import deepcopy from glob import glob import os import shutil -import sys from time import sleep, time -from traceback import format_exc +from traceback import format_exception import numpy as np import scipy.sparse as ssp @@ -159,7 +159,7 @@ def run(self, graph, config, updatehash=False): graph, result={ 'result': None, - 'traceback': format_exc() + 'traceback': '\n'.join(format_exception(*sys.exc_info())) })) else: if result: @@ -327,7 +327,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): self.proc_pending[jobid] = False else: self.pending_tasks.insert(0, (tid, jobid)) - logger.info('Finished submitting: %s ID: %d' % + logger.info('Finished submitting: %s ID: %d', (self.procs[jobid]._id, jobid)) else: break @@ -337,11 +337,17 @@ def _local_hash_check(self, jobid, graph): self.procs[jobid].config['execution']['local_hash_check']): return False - cached, updated = self.procs[jobid].is_cached() + try: + cached, updated = self.procs[jobid].is_cached() + except Exception: + logger.warning('Error while checking node hash, forcing re-run\n\n%s', + '\n'.join(format_exception(*sys.exc_info()))) + return False + logger.debug('Checking hash "%s" locally: cached=%s, updated=%s.', self.procs[jobid].fullname, cached, updated) overwrite = self.procs[jobid].overwrite - always_run = self.procs[jobid]._interface.always_run + always_run = self.procs[jobid].interface.always_run if cached and updated and (overwrite is False or overwrite is None and not always_run): @@ -351,8 +357,9 @@ def _local_hash_check(self, jobid, graph): self._task_finished_cb(jobid, cached=True) self._remove_node_dirs() except Exception: - logger.debug('Error skipping cached node %s (%s).', - self.procs[jobid]._id, jobid) + logger.debug('Error skipping cached node %s (%s).\n\n%s', + self.procs[jobid]._id, jobid, + '\n'.join(format_exception(*sys.exc_info()))) self._clean_queue(jobid, graph) self.proc_pending[jobid] = False return True @@ -481,7 +488,7 @@ def _get_result(self, taskid): taskid, timeout, node_dir)) raise IOError(error_message) except IOError as e: - result_data['traceback'] = format_exc() + result_data['traceback'] = '\n'.join(format_exception(*sys.exc_info())) else: results_file = glob(os.path.join(node_dir, 'result_*.pklz'))[0] result_data = loadpkl(results_file) From 4148881c04cd85636f9f7a3f48981ee9de9c4004 Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Fri, 26 Jan 2018 13:10:24 -0800 Subject: [PATCH 2/5] add longer description to warning --- nipype/pipeline/plugins/base.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/nipype/pipeline/plugins/base.py b/nipype/pipeline/plugins/base.py index c48eec4353..6db128e845 100644 --- a/nipype/pipeline/plugins/base.py +++ b/nipype/pipeline/plugins/base.py @@ -340,7 +340,11 @@ def _local_hash_check(self, jobid, graph): try: cached, updated = self.procs[jobid].is_cached() except Exception: - logger.warning('Error while checking node hash, forcing re-run\n\n%s', + logger.warning('Error while checking node hash, forcing re-run. ' + 'Although this error may not prevent the workflow from running, ' + 'it could indicate a major problem. Please report a new issue' + 'at https://github.com/nipy/nipype/issues adding the following' + 'information:\n\n%s', '\n'.join(format_exception(*sys.exc_info()))) return False From 05613083b374c40bc8b4faf356d776cbb804a6be Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Fri, 26 Jan 2018 18:02:22 -0800 Subject: [PATCH 3/5] improve message, fix error in logging --- nipype/pipeline/plugins/base.py | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/nipype/pipeline/plugins/base.py b/nipype/pipeline/plugins/base.py index 6db128e845..de94e45313 100644 --- a/nipype/pipeline/plugins/base.py +++ b/nipype/pipeline/plugins/base.py @@ -244,7 +244,7 @@ def _submit_mapnode(self, jobid): mapnodesubids = self.procs[jobid].get_subnodes() numnodes = len(mapnodesubids) logger.debug('Adding %d jobs for mapnode %s', numnodes, - self.procs[jobid]._id) + self.procs[jobid].fullname) for i in range(numnodes): self.mapnodesubids[self.depidx.shape[0] + i] = jobid self.procs.extend(mapnodesubids) @@ -274,7 +274,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): slots = None else: slots = max(0, self.max_jobs - num_jobs) - logger.debug('Slots available: %s' % slots) + logger.debug('Slots available: %s', slots) if (num_jobs >= self.max_jobs) or (slots == 0): break @@ -303,14 +303,14 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): self.proc_done[jobid] = True self.proc_pending[jobid] = True # Send job to task manager and add to pending tasks - logger.info('Submitting: %s ID: %d' % - (self.procs[jobid]._id, jobid)) + logger.info('Submitting: %s ID: %d', + self.procs[jobid]._id, jobid) if self._status_callback: self._status_callback(self.procs[jobid], 'start') if not self._local_hash_check(jobid, graph): if self.procs[jobid].run_without_submitting: - logger.debug('Running node %s on master thread' % + logger.debug('Running node %s on master thread', self.procs[jobid]) try: self.procs[jobid].run() @@ -328,7 +328,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): else: self.pending_tasks.insert(0, (tid, jobid)) logger.info('Finished submitting: %s ID: %d', - (self.procs[jobid]._id, jobid)) + self.procs[jobid]._id, jobid) else: break @@ -340,12 +340,17 @@ def _local_hash_check(self, jobid, graph): try: cached, updated = self.procs[jobid].is_cached() except Exception: - logger.warning('Error while checking node hash, forcing re-run. ' - 'Although this error may not prevent the workflow from running, ' - 'it could indicate a major problem. Please report a new issue' - 'at https://github.com/nipy/nipype/issues adding the following' - 'information:\n\n%s', - '\n'.join(format_exception(*sys.exc_info()))) + logger.warning( + 'Error while checking node hash, forcing re-run. ' + 'Although this error may not prevent the workflow from running, ' + 'it could indicate a major problem. Please report a new issue' + 'at https://github.com/nipy/nipype/issues adding the following' + 'information:\n\n\tNode: %s\n\tInterface: %s.%s\n\tTraceback:\n%s', + self.procs[jobid].fullname, + self.procs[jobid].interface.__module__, + self.procs[jobid].interface.__class__.__name__, + '\n'.join(format_exception(*sys.exc_info())) + ) return False logger.debug('Checking hash "%s" locally: cached=%s, updated=%s.', From b3fa687e7d948d28c2628a280724f8463af05a3f Mon Sep 17 00:00:00 2001 From: oesteban Date: Mon, 29 Jan 2018 11:48:04 -0800 Subject: [PATCH 4/5] minor fixes --- nipype/pipeline/plugins/base.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/nipype/pipeline/plugins/base.py b/nipype/pipeline/plugins/base.py index de94e45313..0bd3b8fc37 100644 --- a/nipype/pipeline/plugins/base.py +++ b/nipype/pipeline/plugins/base.py @@ -244,7 +244,7 @@ def _submit_mapnode(self, jobid): mapnodesubids = self.procs[jobid].get_subnodes() numnodes = len(mapnodesubids) logger.debug('Adding %d jobs for mapnode %s', numnodes, - self.procs[jobid].fullname) + self.procs[jobid]) for i in range(numnodes): self.mapnodesubids[self.depidx.shape[0] + i] = jobid self.procs.extend(mapnodesubids) @@ -343,10 +343,10 @@ def _local_hash_check(self, jobid, graph): logger.warning( 'Error while checking node hash, forcing re-run. ' 'Although this error may not prevent the workflow from running, ' - 'it could indicate a major problem. Please report a new issue' - 'at https://github.com/nipy/nipype/issues adding the following' + 'it could indicate a major problem. Please report a new issue ' + 'at https://github.com/nipy/nipype/issues adding the following ' 'information:\n\n\tNode: %s\n\tInterface: %s.%s\n\tTraceback:\n%s', - self.procs[jobid].fullname, + self.procs[jobid], self.procs[jobid].interface.__module__, self.procs[jobid].interface.__class__.__name__, '\n'.join(format_exception(*sys.exc_info())) @@ -354,20 +354,20 @@ def _local_hash_check(self, jobid, graph): return False logger.debug('Checking hash "%s" locally: cached=%s, updated=%s.', - self.procs[jobid].fullname, cached, updated) + self.procs[jobid], cached, updated) overwrite = self.procs[jobid].overwrite always_run = self.procs[jobid].interface.always_run if cached and updated and (overwrite is False or overwrite is None and not always_run): logger.debug('Skipping cached node %s with ID %s.', - self.procs[jobid]._id, jobid) + self.procs[jobid], jobid) try: self._task_finished_cb(jobid, cached=True) self._remove_node_dirs() except Exception: logger.debug('Error skipping cached node %s (%s).\n\n%s', - self.procs[jobid]._id, jobid, + self.procs[jobid], jobid, '\n'.join(format_exception(*sys.exc_info()))) self._clean_queue(jobid, graph) self.proc_pending[jobid] = False @@ -380,7 +380,7 @@ def _task_finished_cb(self, jobid, cached=False): This is called when a job is completed. """ logger.info('[Job %d] %s (%s).', jobid, 'Cached' - if cached else 'Completed', self.procs[jobid].fullname) + if cached else 'Completed', self.procs[jobid]) if self._status_callback: self._status_callback(self.procs[jobid], 'end') # Update job and worker queues From a3722aabeec881593c55767536eda299c68b3e14 Mon Sep 17 00:00:00 2001 From: oesteban Date: Mon, 29 Jan 2018 11:49:34 -0800 Subject: [PATCH 5/5] do not access _id on nodes for logging --- nipype/pipeline/plugins/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nipype/pipeline/plugins/base.py b/nipype/pipeline/plugins/base.py index 0bd3b8fc37..ff84937bc6 100644 --- a/nipype/pipeline/plugins/base.py +++ b/nipype/pipeline/plugins/base.py @@ -304,7 +304,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): self.proc_pending[jobid] = True # Send job to task manager and add to pending tasks logger.info('Submitting: %s ID: %d', - self.procs[jobid]._id, jobid) + self.procs[jobid], jobid) if self._status_callback: self._status_callback(self.procs[jobid], 'start') @@ -328,7 +328,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): else: self.pending_tasks.insert(0, (tid, jobid)) logger.info('Finished submitting: %s ID: %d', - self.procs[jobid]._id, jobid) + self.procs[jobid], jobid) else: break