From 6d429993e4c1c63a39ebffda474b4f27124f4971 Mon Sep 17 00:00:00 2001 From: oesteban Date: Wed, 22 Nov 2017 11:15:12 -0800 Subject: [PATCH 1/3] [FIX] MultiProc mishandling crashes Fixes #2300 --- nipype/pipeline/plugins/base.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/nipype/pipeline/plugins/base.py b/nipype/pipeline/plugins/base.py index bab2812903..cfa4af4645 100644 --- a/nipype/pipeline/plugins/base.py +++ b/nipype/pipeline/plugins/base.py @@ -197,13 +197,18 @@ def _get_result(self, taskid): def _submit_job(self, node, updatehash=False): raise NotImplementedError - def _report_crash(self, node, result=None): - tb = None + def _report_crash(self, node, result=None, traceback=None): + # Overwrite traceback if comes with result + # to keep compatibility if result is not None: node._result = result['result'] - tb = result['traceback'] - node._traceback = tb - return report_crash(node, traceback=tb) + if 'traceback' in result: + traceback = result['traceback'] + + if traceback is not None: + node._traceback = traceback + + return report_crash(node, traceback=traceback) def _clear_task(self, taskid): raise NotImplementedError From b34b00015762ccf5cd1e9e30794d101e4ccc8b9d Mon Sep 17 00:00:00 2001 From: oesteban Date: Wed, 22 Nov 2017 11:24:09 -0800 Subject: [PATCH 2/3] Update CHANGES [skip ci] --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index f7761f7b91..a4faa2fd69 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,7 @@ Upcoming release (0.14.0) ================ +* FIX: MultiProc mishandling crashes (https://github.com/nipy/nipype/pull/2301) * FIX: Testing maintainance and improvements (https://github.com/nipy/nipype/pull/2252) * ENH: Add elapsed_time and final metric_value to ants.Registration (https://github.com/nipy/nipype/pull/1985) * ENH: Improve terminal_output feature (https://github.com/nipy/nipype/pull/2209) From 47fa79025457805fd5f010bb8c6b8af7a1a3a0d4 Mon Sep 17 00:00:00 2001 From: oesteban Date: Wed, 22 Nov 2017 12:13:28 -0800 Subject: [PATCH 3/3] revert changing _report_crash signature --- nipype/pipeline/plugins/base.py | 15 +++++---------- nipype/pipeline/plugins/multiproc.py | 13 +++++++++---- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/nipype/pipeline/plugins/base.py b/nipype/pipeline/plugins/base.py index cfa4af4645..bab2812903 100644 --- a/nipype/pipeline/plugins/base.py +++ b/nipype/pipeline/plugins/base.py @@ -197,18 +197,13 @@ def _get_result(self, taskid): def _submit_job(self, node, updatehash=False): raise NotImplementedError - def _report_crash(self, node, result=None, traceback=None): - # Overwrite traceback if comes with result - # to keep compatibility + def _report_crash(self, node, result=None): + tb = None if result is not None: node._result = result['result'] - if 'traceback' in result: - traceback = result['traceback'] - - if traceback is not None: - node._traceback = traceback - - return report_crash(node, traceback=traceback) + tb = result['traceback'] + node._traceback = tb + return report_crash(node, traceback=tb) def _clear_task(self, taskid): raise NotImplementedError diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 595b0e1947..4a294c89a6 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -238,8 +238,10 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): num_subnodes = self.procs[jobid].num_subnodes() except Exception: traceback = format_exception(*sys.exc_info()) - self._report_crash(self.procs[jobid], traceback=traceback) - self._clean_queue(jobid, graph) + self._clean_queue( + jobid, graph, + result={'result': None, 'traceback': traceback} + ) self.proc_pending[jobid] = False continue if num_subnodes > 1: @@ -275,10 +277,13 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): logger.debug('Running node %s on master thread', self.procs[jobid]) try: - self.procs[jobid].run() + self.procs[jobid].run(updatehash=updatehash) except Exception: traceback = format_exception(*sys.exc_info()) - self._report_crash(self.procs[jobid], traceback=traceback) + self._clean_queue( + jobid, graph, + result={'result': None, 'traceback': traceback} + ) # Release resources self._task_finished_cb(jobid)