From dd2903151e54fbc344326e897a02b9717c1da9f8 Mon Sep 17 00:00:00 2001 From: oesteban Date: Tue, 10 Sep 2019 10:42:00 -0700 Subject: [PATCH 1/4] ENH: Avoid loading result from file when writing reports Minimize the access to the ``result`` property when writing pre/post-execution reports. This modification should particularly preempt https://github.com/nipy/nipype/issues/3009#issuecomment-529799338 --- nipype/pipeline/engine/nodes.py | 10 ++-- nipype/pipeline/engine/utils.py | 90 +++++++++++++++------------------ 2 files changed, 46 insertions(+), 54 deletions(-) diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index 57c566f890..c3b132b5c6 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -37,7 +37,7 @@ from .utils import ( _parameterization_dir, save_hashfile as _save_hashfile, load_resultfile as _load_resultfile, save_resultfile as _save_resultfile, nodelist_runner as - _node_runner, strip_temp as _strip_temp, write_report, + _node_runner, strip_temp as _strip_temp, write_node_report, clean_working_directory, merge_dict, evaluate_connect_function) from .base import EngineBase @@ -464,8 +464,7 @@ def run(self, updatehash=False): # Store runtime-hashfile, pre-execution report, the node and the inputs set. _save_hashfile(hashfile_unfinished, self._hashed_inputs) - write_report( - self, report_type='preexec', is_mapnode=isinstance(self, MapNode)) + write_node_report(self, is_mapnode=isinstance(self, MapNode)) savepkl(op.join(outdir, '_node.pklz'), self) savepkl(op.join(outdir, '_inputs.pklz'), self.inputs.get_traitsfree()) @@ -484,8 +483,7 @@ def run(self, updatehash=False): # Tear-up after success shutil.move(hashfile_unfinished, hashfile_unfinished.replace('_unfinished', '')) - write_report( - self, report_type='postexec', is_mapnode=isinstance(self, MapNode)) + write_node_report(self, result=result, is_mapnode=isinstance(self, MapNode)) logger.info('[Node] Finished "%s".', self.fullname) return result @@ -1204,7 +1202,7 @@ def get_subnodes(self): """Generate subnodes of a mapnode and write pre-execution report""" self._get_inputs() self._check_iterfield() - write_report(self, report_type='preexec', is_mapnode=True) + write_node_report(self, result=None, is_mapnode=True) return [node for _, node in self._make_nodes()] def num_subnodes(self): diff --git a/nipype/pipeline/engine/utils.py b/nipype/pipeline/engine/utils.py index d22150b7b8..c8e17e7c65 100644 --- a/nipype/pipeline/engine/utils.py +++ b/nipype/pipeline/engine/utils.py @@ -116,68 +116,60 @@ def nodelist_runner(nodes, updatehash=False, stop_first=False): yield i, result, err -def write_report(node, report_type=None, is_mapnode=False): - """Write a report file for a node""" +def write_node_report(node, result=None, is_mapnode=False): + """Write a report file for a node.""" if not str2bool(node.config['execution']['create_report']): return - if report_type not in ['preexec', 'postexec']: - logger.warning('[Node] Unknown report type "%s".', report_type) - return - cwd = node.output_dir() - report_dir = os.path.join(cwd, '_report') - report_file = os.path.join(report_dir, 'report.rst') - makedirs(report_dir, exist_ok=True) - - logger.debug('[Node] Writing %s-exec report to "%s"', report_type[:-4], - report_file) - if report_type.startswith('pre'): - lines = [ - write_rst_header('Node: %s' % get_print_name(node), level=0), - write_rst_list( - ['Hierarchy : %s' % node.fullname, - 'Exec ID : %s' % node._id]), - write_rst_header('Original Inputs', level=1), - write_rst_dict(node.inputs.trait_get()), - ] - with open(report_file, 'wt') as fp: - fp.write('\n'.join(lines)) - return + report_file = Path(cwd) / '_report' / 'report.rst' + report_file.parent.mkdir(exist_ok=True) lines = [ + write_rst_header('Node: %s' % get_print_name(node), level=0), + write_rst_list( + ['Hierarchy : %s' % node.fullname, + 'Exec ID : %s' % node._id]), + write_rst_header('Original Inputs', level=1), + write_rst_dict(node.inputs.trait_get()), + ] + + if result is None: + logger.debug('[Node] Writing pre-exec report to "%s"', report_file) + report_file.write_text('\n'.join(lines)) + return + + logger.debug('[Node] Writing post-exec report to "%s"', report_file) + lines += [ write_rst_header('Execution Inputs', level=1), write_rst_dict(node.inputs.trait_get()), + write_rst_header('Execution Outputs', level=1) ] - result = node.result # Locally cache result outputs = result.outputs - if outputs is None: - with open(report_file, 'at') as fp: - fp.write('\n'.join(lines)) + lines += ['None'] + report_file.write_text('\n'.join(lines)) return - lines.append(write_rst_header('Execution Outputs', level=1)) - if isinstance(outputs, Bunch): lines.append(write_rst_dict(outputs.dictcopy())) elif outputs: lines.append(write_rst_dict(outputs.trait_get())) + else: + lines += ['Outputs object was empty.'] if is_mapnode: lines.append(write_rst_header('Subnode reports', level=1)) nitems = len(ensure_list(getattr(node.inputs, node.iterfield[0]))) subnode_report_files = [] for i in range(nitems): - nodecwd = os.path.join(cwd, 'mapflow', '_%s%d' % (node.name, i), - '_report', 'report.rst') - subnode_report_files.append('subnode %d : %s' % (i, nodecwd)) + subnode_file = Path(cwd) / 'mapflow' / ( + '_%s%d' % (node.name, i)) / '_report' / 'report.rst' + subnode_report_files.append('subnode %d : %s' % (i, subnode_file)) lines.append(write_rst_list(subnode_report_files)) - - with open(report_file, 'at') as fp: - fp.write('\n'.join(lines)) + report_file.write_text('\n'.join(lines)) return lines.append(write_rst_header('Runtime info', level=1)) @@ -189,15 +181,9 @@ def write_report(node, report_type=None, is_mapnode=False): 'prev_wd': getattr(result.runtime, 'prevcwd', ''), } - if hasattr(result.runtime, 'cmdline'): - rst_dict['command'] = result.runtime.cmdline - - # Try and insert memory/threads usage if available - if hasattr(result.runtime, 'mem_peak_gb'): - rst_dict['mem_peak_gb'] = result.runtime.mem_peak_gb - - if hasattr(result.runtime, 'cpu_percent'): - rst_dict['cpu_percent'] = result.runtime.cpu_percent + for prop in ('cmdline', 'mem_peak_gb', 'cpu_percent'): + if hasattr(result.runtime, prop): + rst_dict[prop] = getattr(result.runtime, prop) lines.append(write_rst_dict(rst_dict)) @@ -225,9 +211,17 @@ def write_report(node, report_type=None, is_mapnode=False): write_rst_dict(result.runtime.environ), ] - with open(report_file, 'at') as fp: - fp.write('\n'.join(lines)) - return + report_file.write_text('\n'.join(lines)) + + +def write_report(node, report_type=None, is_mapnode=False): + """Write a report file for a node - DEPRECATED""" + if report_type not in ('preexec', 'postexec'): + logger.warning('[Node] Unknown report type "%s".', report_type) + return + + write_node_report(node, is_mapnode=is_mapnode, + result=node.result if report_type == 'postexec' else None) def save_resultfile(result, cwd, name, rebase=None): From 8bfef03f483e9175742957ad969f7a6575a42adc Mon Sep 17 00:00:00 2001 From: oesteban Date: Tue, 10 Sep 2019 17:55:32 -0700 Subject: [PATCH 2/4] fix: address failing tests, add tests for patched Path --- nipype/pipeline/engine/utils.py | 2 +- nipype/utils/filemanip.py | 20 ++++++++++++++++++++ nipype/utils/tests/test_filemanip.py | 18 ++++++++++++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/nipype/pipeline/engine/utils.py b/nipype/pipeline/engine/utils.py index c8e17e7c65..0df39e2a5a 100644 --- a/nipype/pipeline/engine/utils.py +++ b/nipype/pipeline/engine/utils.py @@ -123,7 +123,7 @@ def write_node_report(node, result=None, is_mapnode=False): cwd = node.output_dir() report_file = Path(cwd) / '_report' / 'report.rst' - report_file.parent.mkdir(exist_ok=True) + report_file.parent.mkdir(exist_ok=True, parents=True) lines = [ write_rst_header('Node: %s' % get_print_name(node), level=0), diff --git a/nipype/utils/filemanip.py b/nipype/utils/filemanip.py index 35b3de219b..a2ae0b09d6 100644 --- a/nipype/utils/filemanip.py +++ b/nipype/utils/filemanip.py @@ -60,6 +60,8 @@ def __init__(self, path): try: Path('/invented/file/path').resolve(strict=True) except TypeError: + from tempfile import gettempdir + def _patch_resolve(self, strict=False): """Add the argument strict to signature in Python>3,<3.6.""" resolved = Path().old_resolve() / self @@ -70,6 +72,24 @@ def _patch_resolve(self, strict=False): Path.old_resolve = Path.resolve Path.resolve = _patch_resolve + + if not hasattr(Path, 'write_text'): + def _write_text(self, text): + with open(str(self), 'w') as f: + f.write(text) + Path.write_text = _write_text + + try: + (Path(gettempdir()) / 'exist_ok_test').mkdir(exist_ok=True) + except TypeError: + def _mkdir(self, mode=0o777, parents=False, exist_ok=False): + if not exist_ok and self.exists(): + raise FileExistsError(str(self)) + if not parents and not Path(str(self.parents)).exists(): + raise FileNotFoundError(str(self.parents)) + os.makedirs(str(self), mode=mode, exist_ok=exist_ok) + Path.mkdir = _mkdir + except FileNotFoundError: pass except OSError: diff --git a/nipype/utils/tests/test_filemanip.py b/nipype/utils/tests/test_filemanip.py index 4ec78e1984..6bae92203b 100644 --- a/nipype/utils/tests/test_filemanip.py +++ b/nipype/utils/tests/test_filemanip.py @@ -596,3 +596,21 @@ def test_pickle(tmp_path, save_versioning): savepkl(pickle_fname, testobj, versioning=save_versioning) outobj = loadpkl(pickle_fname) assert outobj == testobj + + +def test_Path(tmpdir): + tmp_path = Path(tmpdir.strpath) + + assert hasattr(tmp_path, 'write_text') + + (tmp_path / 'textfile').write_text('some text') + + with pytest.raises(OSError): + (tmp_path / 'no' / 'parents').mkdir(parents=False) + + (tmp_path / 'no' / 'parents').mkdir(parents=True) + + with pytest.raises(OSError): + (tmp_path / 'no' / 'parents').mkdir(parents=False) + + (tmp_path / 'no' / 'parents').mkdir(parents=True, exist_ok=True) From a62a369c22e83fbd37fe88a4a84503d756f4aa9d Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Wed, 11 Sep 2019 09:42:27 -0700 Subject: [PATCH 3/4] Update nipype/utils/filemanip.py [skip ci] Co-Authored-By: Chris Markiewicz --- nipype/utils/filemanip.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nipype/utils/filemanip.py b/nipype/utils/filemanip.py index a2ae0b09d6..541b3f62b9 100644 --- a/nipype/utils/filemanip.py +++ b/nipype/utils/filemanip.py @@ -80,7 +80,8 @@ def _write_text(self, text): Path.write_text = _write_text try: - (Path(gettempdir()) / 'exist_ok_test').mkdir(exist_ok=True) + with tempfile.TemporaryDirectory() as tmpdir: + (Path(tmpdir) / 'exist_ok_test').mkdir(exist_ok=True) except TypeError: def _mkdir(self, mode=0o777, parents=False, exist_ok=False): if not exist_ok and self.exists(): From c120ee5e069df1becaee1d7e98af3681df883861 Mon Sep 17 00:00:00 2001 From: oesteban Date: Wed, 11 Sep 2019 11:02:24 -0700 Subject: [PATCH 4/4] fix: address @effigies' review comments --- nipype/utils/filemanip.py | 45 ++++++++++++++-------------- nipype/utils/tests/test_filemanip.py | 2 -- 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/nipype/utils/filemanip.py b/nipype/utils/filemanip.py index 541b3f62b9..c919000d34 100644 --- a/nipype/utils/filemanip.py +++ b/nipype/utils/filemanip.py @@ -57,11 +57,9 @@ def __init__(self, path): from pathlib2 import Path USING_PATHLIB2 = True -try: +try: # PY35 - strict mode was added in 3.6 Path('/invented/file/path').resolve(strict=True) except TypeError: - from tempfile import gettempdir - def _patch_resolve(self, strict=False): """Add the argument strict to signature in Python>3,<3.6.""" resolved = Path().old_resolve() / self @@ -72,28 +70,10 @@ def _patch_resolve(self, strict=False): Path.old_resolve = Path.resolve Path.resolve = _patch_resolve - - if not hasattr(Path, 'write_text'): - def _write_text(self, text): - with open(str(self), 'w') as f: - f.write(text) - Path.write_text = _write_text - - try: - with tempfile.TemporaryDirectory() as tmpdir: - (Path(tmpdir) / 'exist_ok_test').mkdir(exist_ok=True) - except TypeError: - def _mkdir(self, mode=0o777, parents=False, exist_ok=False): - if not exist_ok and self.exists(): - raise FileExistsError(str(self)) - if not parents and not Path(str(self.parents)).exists(): - raise FileNotFoundError(str(self.parents)) - os.makedirs(str(self), mode=mode, exist_ok=exist_ok) - Path.mkdir = _mkdir - except FileNotFoundError: pass except OSError: + # PY2 def _patch_resolve(self, strict=False): """Raise FileNotFoundError instead of OSError with pathlib2.""" try: @@ -106,6 +86,27 @@ def _patch_resolve(self, strict=False): Path.old_resolve = Path.resolve Path.resolve = _patch_resolve +if not hasattr(Path, 'write_text'): + # PY34 - Path does not have write_text + def _write_text(self, text): + with open(str(self), 'w') as f: + f.write(text) + Path.write_text = _write_text + +if PY3: + try: # PY34 - mkdir does not have exist_ok + from tempfile import TemporaryDirectory + with TemporaryDirectory() as tmpdir: + (Path(tmpdir) / 'exist_ok_test').mkdir(exist_ok=True) + except TypeError: + def _mkdir(self, mode=0o777, parents=False, exist_ok=False): + if parents: + os.makedirs(str(self), mode=mode, exist_ok=exist_ok) + elif not exist_ok or not self.exists(): + os.mkdir(str(self), mode=mode) + + Path.mkdir = _mkdir + def split_filename(fname): """Split a filename into parts: path, base filename and extension. diff --git a/nipype/utils/tests/test_filemanip.py b/nipype/utils/tests/test_filemanip.py index 6bae92203b..9ee2e4c0ba 100644 --- a/nipype/utils/tests/test_filemanip.py +++ b/nipype/utils/tests/test_filemanip.py @@ -601,8 +601,6 @@ def test_pickle(tmp_path, save_versioning): def test_Path(tmpdir): tmp_path = Path(tmpdir.strpath) - assert hasattr(tmp_path, 'write_text') - (tmp_path / 'textfile').write_text('some text') with pytest.raises(OSError):