From 47ba1efb372e06d57a9dc18cd4b638d6c60e11dc Mon Sep 17 00:00:00 2001 From: oesteban Date: Wed, 28 Nov 2018 15:35:51 -0800 Subject: [PATCH 1/2] [FIX] ``status_callback`` not called with ``stop_on_first_crash`` Some tests were randomly failing in Travis for the Linear plugin. My suspiction is that some other test changed the configuration of ``stop_on_first_crash`` and depending on the ordering tests were actually run, this test would sometimes fail, apparently at random. The tests have been expanded to test also LegacyMultiProc and to check under both conditions (stop_on_first_crash on/off). --- nipype/pipeline/plugins/linear.py | 18 ++-- .../pipeline/plugins/tests/test_callback.py | 88 +++++-------------- 2 files changed, 35 insertions(+), 71 deletions(-) diff --git a/nipype/pipeline/plugins/linear.py b/nipype/pipeline/plugins/linear.py index 9863c0f6c6..380aa2ae0b 100644 --- a/nipype/pipeline/plugins/linear.py +++ b/nipype/pipeline/plugins/linear.py @@ -36,16 +36,15 @@ def run(self, graph, config, updatehash=False): donotrun = [] nodes, _ = topological_sort(graph) for node in nodes: + endstatus = 'end' try: if node in donotrun: continue if self._status_callback: self._status_callback(node, 'start') node.run(updatehash=updatehash) - if self._status_callback: - self._status_callback(node, 'end') except: - os.chdir(old_wd) + endstatus = 'exception' # bare except, but i really don't know where a # node might fail crashfile = report_crash(node) @@ -53,9 +52,16 @@ def run(self, graph, config, updatehash=False): raise # remove dependencies from queue subnodes = [s for s in dfs_preorder(graph, node)] - notrun.append( - dict(node=node, dependents=subnodes, crashfile=crashfile)) + notrun.append({'node': node, 'dependents': subnodes, + 'crashfile': crashfile}) donotrun.extend(subnodes) + # Delay raising the crash until we cleaned the house + if str2bool(config['execution']['stop_on_first_crash']): + report_nodes_not_run(notrun) # report before raising + raise + finally: + # Return wherever we were before + os.chdir(old_wd) if self._status_callback: - self._status_callback(node, 'exception') + self._status_callback(node, endstatus) report_nodes_not_run(notrun) diff --git a/nipype/pipeline/plugins/tests/test_callback.py b/nipype/pipeline/plugins/tests/test_callback.py index 29c5cbd404..6b9525071e 100644 --- a/nipype/pipeline/plugins/tests/test_callback.py +++ b/nipype/pipeline/plugins/tests/test_callback.py @@ -6,8 +6,8 @@ from builtins import object +from time import sleep import pytest -import sys import nipype.interfaces.utility as niu import nipype.pipeline.engine as pe @@ -25,10 +25,11 @@ def __init__(self): self.statuses = [] def callback(self, node, status, result=None): - self.statuses.append((node, status)) + self.statuses.append((node.name, status)) -def test_callback_normal(tmpdir): +@pytest.mark.parametrize("plugin", ['Linear', 'MultiProc', 'LegacyMultiProc']) +def test_callback_normal(tmpdir, plugin): tmpdir.chdir() so = Status() @@ -37,16 +38,17 @@ def test_callback_normal(tmpdir): niu.Function(function=func, input_names=[], output_names=[]), name='f_node') wf.add_nodes([f_node]) - wf.config['execution'] = {'crashdump_dir': wf.base_dir} - wf.run(plugin="Linear", plugin_args={'status_callback': so.callback}) - assert len(so.statuses) == 2 - for (n, s) in so.statuses: - assert n.name == 'f_node' - assert so.statuses[0][1] == 'start' - assert so.statuses[1][1] == 'end' + wf.config['execution'] = { + 'crashdump_dir': wf.base_dir, + 'poll_sleep_duration': 2 + } + wf.run(plugin=plugin, plugin_args={'status_callback': so.callback}) + assert so.statuses == [('f_node', 'start'), ('f_node', 'end')] -def test_callback_exception(tmpdir): +@pytest.mark.parametrize("plugin", ['Linear', 'MultiProc', 'LegacyMultiProc']) +@pytest.mark.parametrize("stop_on_first_crash", [False, True]) +def test_callback_exception(tmpdir, plugin, stop_on_first_crash): tmpdir.chdir() so = Status() @@ -55,57 +57,13 @@ def test_callback_exception(tmpdir): niu.Function(function=bad_func, input_names=[], output_names=[]), name='f_node') wf.add_nodes([f_node]) - wf.config['execution'] = {'crashdump_dir': wf.base_dir} - try: - wf.run(plugin="Linear", plugin_args={'status_callback': so.callback}) - except: - pass - assert len(so.statuses) == 2 - for (n, s) in so.statuses: - assert n.name == 'f_node' - assert so.statuses[0][1] == 'start' - assert so.statuses[1][1] == 'exception' - - -def test_callback_multiproc_normal(tmpdir): - tmpdir.chdir() - - so = Status() - wf = pe.Workflow(name='test', base_dir=tmpdir.strpath) - f_node = pe.Node( - niu.Function(function=func, input_names=[], output_names=[]), - name='f_node') - wf.add_nodes([f_node]) - wf.config['execution']['crashdump_dir'] = wf.base_dir - wf.config['execution']['poll_sleep_duration'] = 2 - wf.run(plugin='MultiProc', plugin_args={'status_callback': so.callback}) - assert len(so.statuses) == 2 - for (n, s) in so.statuses: - assert n.name == 'f_node' - assert so.statuses[0][1] == 'start' - assert so.statuses[1][1] == 'end' - - -def test_callback_multiproc_exception(tmpdir): - tmpdir.chdir() - - so = Status() - wf = pe.Workflow(name='test', base_dir=tmpdir.strpath) - f_node = pe.Node( - niu.Function(function=bad_func, input_names=[], output_names=[]), - name='f_node') - wf.add_nodes([f_node]) - wf.config['execution'] = {'crashdump_dir': wf.base_dir} - - try: - wf.run( - plugin='MultiProc', plugin_args={ - 'status_callback': so.callback - }) - except: - pass - assert len(so.statuses) == 2 - for (n, s) in so.statuses: - assert n.name == 'f_node' - assert so.statuses[0][1] == 'start' - assert so.statuses[1][1] == 'exception' + wf.config['execution'] = { + 'crashdump_dir': wf.base_dir, + 'stop_on_first_crash': stop_on_first_crash, + 'poll_sleep_duration': 2 + } + with pytest.raises(Exception): + wf.run(plugin=plugin, plugin_args={'status_callback': so.callback}) + + sleep(0.5) # Wait for callback to be called (python 2.7) + assert so.statuses == [('f_node', 'start'), ('f_node', 'exception')] From 491c123af76dee72714bb52c278458ab812bea3c Mon Sep 17 00:00:00 2001 From: oesteban Date: Wed, 28 Nov 2018 15:45:14 -0800 Subject: [PATCH 2/2] do not chdir after all nodes are run --- nipype/pipeline/plugins/linear.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/nipype/pipeline/plugins/linear.py b/nipype/pipeline/plugins/linear.py index 380aa2ae0b..41f5c998fe 100644 --- a/nipype/pipeline/plugins/linear.py +++ b/nipype/pipeline/plugins/linear.py @@ -57,11 +57,12 @@ def run(self, graph, config, updatehash=False): donotrun.extend(subnodes) # Delay raising the crash until we cleaned the house if str2bool(config['execution']['stop_on_first_crash']): + os.chdir(old_wd) # Return wherever we were before report_nodes_not_run(notrun) # report before raising raise finally: - # Return wherever we were before - os.chdir(old_wd) if self._status_callback: self._status_callback(node, endstatus) + + os.chdir(old_wd) # Return wherever we were before report_nodes_not_run(notrun)