Skip to content

Commit dd29031

Browse files
committed
ENH: Avoid loading result from file when writing reports
Minimize the access to the ``result`` property when writing pre/post-execution reports. This modification should particularly preempt #3009 (comment)
1 parent 01de656 commit dd29031

File tree

2 files changed

+46
-54
lines changed

2 files changed

+46
-54
lines changed

nipype/pipeline/engine/nodes.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
from .utils import (
3838
_parameterization_dir, save_hashfile as _save_hashfile, load_resultfile as
3939
_load_resultfile, save_resultfile as _save_resultfile, nodelist_runner as
40-
_node_runner, strip_temp as _strip_temp, write_report,
40+
_node_runner, strip_temp as _strip_temp, write_node_report,
4141
clean_working_directory, merge_dict, evaluate_connect_function)
4242
from .base import EngineBase
4343

@@ -464,8 +464,7 @@ def run(self, updatehash=False):
464464

465465
# Store runtime-hashfile, pre-execution report, the node and the inputs set.
466466
_save_hashfile(hashfile_unfinished, self._hashed_inputs)
467-
write_report(
468-
self, report_type='preexec', is_mapnode=isinstance(self, MapNode))
467+
write_node_report(self, is_mapnode=isinstance(self, MapNode))
469468
savepkl(op.join(outdir, '_node.pklz'), self)
470469
savepkl(op.join(outdir, '_inputs.pklz'), self.inputs.get_traitsfree())
471470

@@ -484,8 +483,7 @@ def run(self, updatehash=False):
484483
# Tear-up after success
485484
shutil.move(hashfile_unfinished,
486485
hashfile_unfinished.replace('_unfinished', ''))
487-
write_report(
488-
self, report_type='postexec', is_mapnode=isinstance(self, MapNode))
486+
write_node_report(self, result=result, is_mapnode=isinstance(self, MapNode))
489487
logger.info('[Node] Finished "%s".', self.fullname)
490488
return result
491489

@@ -1204,7 +1202,7 @@ def get_subnodes(self):
12041202
"""Generate subnodes of a mapnode and write pre-execution report"""
12051203
self._get_inputs()
12061204
self._check_iterfield()
1207-
write_report(self, report_type='preexec', is_mapnode=True)
1205+
write_node_report(self, result=None, is_mapnode=True)
12081206
return [node for _, node in self._make_nodes()]
12091207

12101208
def num_subnodes(self):

nipype/pipeline/engine/utils.py

Lines changed: 42 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -116,68 +116,60 @@ def nodelist_runner(nodes, updatehash=False, stop_first=False):
116116
yield i, result, err
117117

118118

119-
def write_report(node, report_type=None, is_mapnode=False):
120-
"""Write a report file for a node"""
119+
def write_node_report(node, result=None, is_mapnode=False):
120+
"""Write a report file for a node."""
121121
if not str2bool(node.config['execution']['create_report']):
122122
return
123123

124-
if report_type not in ['preexec', 'postexec']:
125-
logger.warning('[Node] Unknown report type "%s".', report_type)
126-
return
127-
128124
cwd = node.output_dir()
129-
report_dir = os.path.join(cwd, '_report')
130-
report_file = os.path.join(report_dir, 'report.rst')
131-
makedirs(report_dir, exist_ok=True)
132-
133-
logger.debug('[Node] Writing %s-exec report to "%s"', report_type[:-4],
134-
report_file)
135-
if report_type.startswith('pre'):
136-
lines = [
137-
write_rst_header('Node: %s' % get_print_name(node), level=0),
138-
write_rst_list(
139-
['Hierarchy : %s' % node.fullname,
140-
'Exec ID : %s' % node._id]),
141-
write_rst_header('Original Inputs', level=1),
142-
write_rst_dict(node.inputs.trait_get()),
143-
]
144-
with open(report_file, 'wt') as fp:
145-
fp.write('\n'.join(lines))
146-
return
125+
report_file = Path(cwd) / '_report' / 'report.rst'
126+
report_file.parent.mkdir(exist_ok=True)
147127

148128
lines = [
129+
write_rst_header('Node: %s' % get_print_name(node), level=0),
130+
write_rst_list(
131+
['Hierarchy : %s' % node.fullname,
132+
'Exec ID : %s' % node._id]),
133+
write_rst_header('Original Inputs', level=1),
134+
write_rst_dict(node.inputs.trait_get()),
135+
]
136+
137+
if result is None:
138+
logger.debug('[Node] Writing pre-exec report to "%s"', report_file)
139+
report_file.write_text('\n'.join(lines))
140+
return
141+
142+
logger.debug('[Node] Writing post-exec report to "%s"', report_file)
143+
lines += [
149144
write_rst_header('Execution Inputs', level=1),
150145
write_rst_dict(node.inputs.trait_get()),
146+
write_rst_header('Execution Outputs', level=1)
151147
]
152148

153-
result = node.result # Locally cache result
154149
outputs = result.outputs
155-
156150
if outputs is None:
157-
with open(report_file, 'at') as fp:
158-
fp.write('\n'.join(lines))
151+
lines += ['None']
152+
report_file.write_text('\n'.join(lines))
159153
return
160154

161-
lines.append(write_rst_header('Execution Outputs', level=1))
162-
163155
if isinstance(outputs, Bunch):
164156
lines.append(write_rst_dict(outputs.dictcopy()))
165157
elif outputs:
166158
lines.append(write_rst_dict(outputs.trait_get()))
159+
else:
160+
lines += ['Outputs object was empty.']
167161

168162
if is_mapnode:
169163
lines.append(write_rst_header('Subnode reports', level=1))
170164
nitems = len(ensure_list(getattr(node.inputs, node.iterfield[0])))
171165
subnode_report_files = []
172166
for i in range(nitems):
173-
nodecwd = os.path.join(cwd, 'mapflow', '_%s%d' % (node.name, i),
174-
'_report', 'report.rst')
175-
subnode_report_files.append('subnode %d : %s' % (i, nodecwd))
167+
subnode_file = Path(cwd) / 'mapflow' / (
168+
'_%s%d' % (node.name, i)) / '_report' / 'report.rst'
169+
subnode_report_files.append('subnode %d : %s' % (i, subnode_file))
176170

177171
lines.append(write_rst_list(subnode_report_files))
178-
179-
with open(report_file, 'at') as fp:
180-
fp.write('\n'.join(lines))
172+
report_file.write_text('\n'.join(lines))
181173
return
182174

183175
lines.append(write_rst_header('Runtime info', level=1))
@@ -189,15 +181,9 @@ def write_report(node, report_type=None, is_mapnode=False):
189181
'prev_wd': getattr(result.runtime, 'prevcwd', '<not-set>'),
190182
}
191183

192-
if hasattr(result.runtime, 'cmdline'):
193-
rst_dict['command'] = result.runtime.cmdline
194-
195-
# Try and insert memory/threads usage if available
196-
if hasattr(result.runtime, 'mem_peak_gb'):
197-
rst_dict['mem_peak_gb'] = result.runtime.mem_peak_gb
198-
199-
if hasattr(result.runtime, 'cpu_percent'):
200-
rst_dict['cpu_percent'] = result.runtime.cpu_percent
184+
for prop in ('cmdline', 'mem_peak_gb', 'cpu_percent'):
185+
if hasattr(result.runtime, prop):
186+
rst_dict[prop] = getattr(result.runtime, prop)
201187

202188
lines.append(write_rst_dict(rst_dict))
203189

@@ -225,9 +211,17 @@ def write_report(node, report_type=None, is_mapnode=False):
225211
write_rst_dict(result.runtime.environ),
226212
]
227213

228-
with open(report_file, 'at') as fp:
229-
fp.write('\n'.join(lines))
230-
return
214+
report_file.write_text('\n'.join(lines))
215+
216+
217+
def write_report(node, report_type=None, is_mapnode=False):
218+
"""Write a report file for a node - DEPRECATED"""
219+
if report_type not in ('preexec', 'postexec'):
220+
logger.warning('[Node] Unknown report type "%s".', report_type)
221+
return
222+
223+
write_node_report(node, is_mapnode=is_mapnode,
224+
result=node.result if report_type == 'postexec' else None)
231225

232226

233227
def save_resultfile(result, cwd, name, rebase=None):

0 commit comments

Comments
 (0)