Skip to content

Commit 9b0442a

Browse files
committed
Merge pull request #1372 from FCP-INDI/resource_multiproc
ResourceMultiProc plugin and runtime profiler
2 parents b89294f + 9ee881f commit 9b0442a

18 files changed

+2019
-44
lines changed

doc/users/images/gantt_chart.png

107 KB
Loading

doc/users/resource_sched_profiler.rst

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
.. _resource_sched_profiler:
2+
3+
============================================
4+
Resource Scheduling and Profiling with Nipype
5+
============================================
6+
The latest version of Nipype supports system resource scheduling and profiling.
7+
These features allows users to ensure high throughput of their data processing
8+
while also controlling the amount of computing resources a given workflow will
9+
use.
10+
11+
12+
Specifying Resources in the Node Interface
13+
==========================================
14+
Each ``Node`` instance interface has two parameters that specify its expected
15+
thread and memory usage: ``num_threads`` and ``estimated_memory_gb``. If a
16+
particular node is expected to use 8 threads and 2 GB of memory:
17+
18+
::
19+
20+
import nipype.pipeline.engine as pe
21+
node = pe.Node()
22+
node.interface.num_threads = 8
23+
node.interface.estimated_memory_gb = 2
24+
25+
If the resource parameters are never set, they default to being 1 thread and 1
26+
GB of RAM.
27+
28+
29+
Resource Scheduler
30+
==================
31+
The ``MultiProc`` workflow plugin schedules node execution based on the
32+
resources used by the current running nodes and the total resources available to
33+
the workflow. The plugin utilizes the plugin arguments ``n_procs`` and
34+
``memory_gb`` to set the maximum resources a workflow can utilize. To limit a
35+
workflow to using 8 cores and 10 GB of RAM:
36+
37+
::
38+
39+
args_dict = {'n_procs' : 8, 'memory_gb' : 10}
40+
workflow.run(plugin='MultiProc', plugin_args=args_dict)
41+
42+
If these values are not specifically set then the plugin will assume it can
43+
use all of the processors and memory on the system. For example, if the machine
44+
has 16 cores and 12 GB of RAM, the workflow will internally assume those values
45+
for ``n_procs`` and ``memory_gb``, respectively.
46+
47+
The plugin will then queue eligible nodes for execution based on their expected
48+
usage via the ``num_threads`` and ``estimated_memory_gb`` interface parameters.
49+
If the plugin sees that only 3 of its 8 processors and 4 GB of its 10 GB of RAM
50+
are being used by running nodes, it will attempt to execute the next available
51+
node as long as its ``num_threads <= 5`` and ``estimated_memory_gb <= 6``. If
52+
this is not the case, it will continue to check every available node in the
53+
queue until it sees a node that meets these conditions, or it waits for an
54+
executing node to finish to earn back the necessary resources. The priority of
55+
the queue is highest for nodes with the most ``estimated_memory_gb`` followed
56+
by nodes with the most expected ``num_threads``.
57+
58+
59+
Runtime Profiler and using the Callback Log
60+
===========================================
61+
It is not always easy to estimate the amount of resources a particular function
62+
or command uses. To help with this, Nipype provides some feedback about the
63+
system resources used by every node during workflow execution via the built-in
64+
runtime profiler. The runtime profiler is automatically enabled if the
65+
psutil_ Python package is installed and found on the system.
66+
67+
.. _psutil: https://pythonhosted.org/psutil/
68+
69+
If the package is not found, the workflow will run normally without the runtime
70+
profiler.
71+
72+
The runtime profiler records the number of threads and the amount of memory (GB)
73+
used as ``runtime_threads`` and ``runtime_memory_gb`` in the Node's
74+
``result.runtime`` attribute. Since the node object is pickled and written to
75+
disk in its working directory, these values are available for analysis after
76+
node or workflow execution by manually parsing the pickle file contents.
77+
78+
Nipype also provides a logging mechanism for saving node runtime statistics to
79+
a JSON-style log file via the ``log_nodes_cb`` logger function. This is enabled
80+
by setting the ``status_callback`` parameter to point to this function in the
81+
``plugin_args`` when using the ``MultiProc`` plugin.
82+
83+
::
84+
85+
from nipype.pipeline.plugins.callback_log import log_nodes_cb
86+
args_dict = {'n_procs' : 8, 'memory_gb' : 10, 'status_callback' : log_nodes_cb}
87+
88+
To set the filepath for the callback log the ``'callback'`` logger must be
89+
configured.
90+
91+
::
92+
93+
# Set path to log file
94+
import logging
95+
callback_log_path = '/home/user/run_stats.log'
96+
logger = logging.getLogger('callback')
97+
logger.setLevel(logging.DEBUG)
98+
handler = logging.FileHandler(callback_log_path)
99+
logger.addHandler(handler)
100+
101+
Finally, the workflow can be run.
102+
103+
::
104+
105+
workflow.run(plugin='MultiProc', plugin_args=args_dict)
106+
107+
After the workflow finishes executing, the log file at
108+
"/home/user/run_stats.log" can be parsed for the runtime statistics. Here is an
109+
example of what the contents would look like:
110+
111+
::
112+
113+
{"name":"resample_node","id":"resample_node",
114+
"start":"2016-03-11 21:43:41.682258",
115+
"estimated_memory_gb":2,"num_threads":1}
116+
{"name":"resample_node","id":"resample_node",
117+
"finish":"2016-03-11 21:44:28.357519",
118+
"estimated_memory_gb":"2","num_threads":"1",
119+
"runtime_threads":"3","runtime_memory_gb":"1.118469238281"}
120+
121+
Here it can be seen that the number of threads was underestimated while the
122+
amount of memory needed was overestimated. The next time this workflow is run
123+
the user can change the node interface ``num_threads`` and
124+
``estimated_memory_gb`` parameters to reflect this for a higher pipeline
125+
throughput. Note, sometimes the "runtime_threads" value is higher than expected,
126+
particularly for multi-threaded applications. Tools can implement
127+
multi-threading in different ways under-the-hood; the profiler merely traverses
128+
the process tree to return all running threads associated with that process,
129+
some of which may include active thread-monitoring daemons or transient
130+
processes.
131+
132+
133+
Visualizing Pipeline Resources
134+
==============================
135+
Nipype provides the ability to visualize the workflow execution based on the
136+
runtimes and system resources each node takes. It does this using the log file
137+
generated from the callback logger after workflow execution - as shown above.
138+
The pandas_ Python package is required to use this feature.
139+
140+
.. _pandas: http://pandas.pydata.org/
141+
142+
::
143+
144+
from nipype.pipeline.plugins.callback_log import log_nodes_cb
145+
args_dict = {'n_procs' : 8, 'memory_gb' : 10, 'status_callback' : log_nodes_cb}
146+
workflow.run(plugin='MultiProc', plugin_args=args_dict)
147+
148+
# ...workflow finishes and writes callback log to '/home/user/run_stats.log'
149+
150+
from nipype.utils.draw_gantt_chart import generate_gantt_chart
151+
generate_gantt_chart('/home/user/run_stats.log', cores=8)
152+
# ...creates gantt chart in '/home/user/run_stats.log.html'
153+
154+
The ``generate_gantt_chart`` function will create an html file that can be viewed
155+
in a browser. Below is an example of the gantt chart displayed in a web browser.
156+
Note that when the cursor is hovered over any particular node bubble or resource
157+
bubble, some additional information is shown in a pop-up.
158+
159+
* - .. image:: images/gantt_chart.png
160+
:width: 100 %

nipype/interfaces/afni/base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ def __init__(self, **inputs):
157157
else:
158158
self._output_update()
159159

160+
# Update num threads estimate from OMP_NUM_THREADS env var
161+
# Default to 1 if not set
162+
os.environ['OMP_NUM_THREADS'] = str(self.num_threads)
163+
160164
def _output_update(self):
161165
""" i think? updates class private attribute based on instance input
162166
in fsl also updates ENVIRON variable....not valid in afni

0 commit comments

Comments
 (0)