diff --git a/doc/users/images/gantt_chart.png b/doc/users/images/gantt_chart.png new file mode 100644 index 0000000000..e457aa8799 Binary files /dev/null and b/doc/users/images/gantt_chart.png differ diff --git a/doc/users/resource_sched_profiler.rst b/doc/users/resource_sched_profiler.rst new file mode 100644 index 0000000000..99d1a7064c --- /dev/null +++ b/doc/users/resource_sched_profiler.rst @@ -0,0 +1,160 @@ +.. _resource_sched_profiler: + +============================================ +Resource Scheduling and Profiling with Nipype +============================================ +The latest version of Nipype supports system resource scheduling and profiling. +These features allows users to ensure high throughput of their data processing +while also controlling the amount of computing resources a given workflow will +use. + + +Specifying Resources in the Node Interface +========================================== +Each ``Node`` instance interface has two parameters that specify its expected +thread and memory usage: ``num_threads`` and ``estimated_memory_gb``. If a +particular node is expected to use 8 threads and 2 GB of memory: + +:: + + import nipype.pipeline.engine as pe + node = pe.Node() + node.interface.num_threads = 8 + node.interface.estimated_memory_gb = 2 + +If the resource parameters are never set, they default to being 1 thread and 1 +GB of RAM. + + +Resource Scheduler +================== +The ``MultiProc`` workflow plugin schedules node execution based on the +resources used by the current running nodes and the total resources available to +the workflow. The plugin utilizes the plugin arguments ``n_procs`` and +``memory_gb`` to set the maximum resources a workflow can utilize. To limit a +workflow to using 8 cores and 10 GB of RAM: + +:: + + args_dict = {'n_procs' : 8, 'memory_gb' : 10} + workflow.run(plugin='MultiProc', plugin_args=args_dict) + +If these values are not specifically set then the plugin will assume it can +use all of the processors and memory on the system. For example, if the machine +has 16 cores and 12 GB of RAM, the workflow will internally assume those values +for ``n_procs`` and ``memory_gb``, respectively. + +The plugin will then queue eligible nodes for execution based on their expected +usage via the ``num_threads`` and ``estimated_memory_gb`` interface parameters. +If the plugin sees that only 3 of its 8 processors and 4 GB of its 10 GB of RAM +are being used by running nodes, it will attempt to execute the next available +node as long as its ``num_threads <= 5`` and ``estimated_memory_gb <= 6``. If +this is not the case, it will continue to check every available node in the +queue until it sees a node that meets these conditions, or it waits for an +executing node to finish to earn back the necessary resources. The priority of +the queue is highest for nodes with the most ``estimated_memory_gb`` followed +by nodes with the most expected ``num_threads``. + + +Runtime Profiler and using the Callback Log +=========================================== +It is not always easy to estimate the amount of resources a particular function +or command uses. To help with this, Nipype provides some feedback about the +system resources used by every node during workflow execution via the built-in +runtime profiler. The runtime profiler is automatically enabled if the +psutil_ Python package is installed and found on the system. + +.. _psutil: https://pythonhosted.org/psutil/ + +If the package is not found, the workflow will run normally without the runtime +profiler. + +The runtime profiler records the number of threads and the amount of memory (GB) +used as ``runtime_threads`` and ``runtime_memory_gb`` in the Node's +``result.runtime`` attribute. Since the node object is pickled and written to +disk in its working directory, these values are available for analysis after +node or workflow execution by manually parsing the pickle file contents. + +Nipype also provides a logging mechanism for saving node runtime statistics to +a JSON-style log file via the ``log_nodes_cb`` logger function. This is enabled +by setting the ``status_callback`` parameter to point to this function in the +``plugin_args`` when using the ``MultiProc`` plugin. + +:: + + from nipype.pipeline.plugins.callback_log import log_nodes_cb + args_dict = {'n_procs' : 8, 'memory_gb' : 10, 'status_callback' : log_nodes_cb} + +To set the filepath for the callback log the ``'callback'`` logger must be +configured. + +:: + + # Set path to log file + import logging + callback_log_path = '/home/user/run_stats.log' + logger = logging.getLogger('callback') + logger.setLevel(logging.DEBUG) + handler = logging.FileHandler(callback_log_path) + logger.addHandler(handler) + +Finally, the workflow can be run. + +:: + + workflow.run(plugin='MultiProc', plugin_args=args_dict) + +After the workflow finishes executing, the log file at +"/home/user/run_stats.log" can be parsed for the runtime statistics. Here is an +example of what the contents would look like: + +:: + + {"name":"resample_node","id":"resample_node", + "start":"2016-03-11 21:43:41.682258", + "estimated_memory_gb":2,"num_threads":1} + {"name":"resample_node","id":"resample_node", + "finish":"2016-03-11 21:44:28.357519", + "estimated_memory_gb":"2","num_threads":"1", + "runtime_threads":"3","runtime_memory_gb":"1.118469238281"} + +Here it can be seen that the number of threads was underestimated while the +amount of memory needed was overestimated. The next time this workflow is run +the user can change the node interface ``num_threads`` and +``estimated_memory_gb`` parameters to reflect this for a higher pipeline +throughput. Note, sometimes the "runtime_threads" value is higher than expected, +particularly for multi-threaded applications. Tools can implement +multi-threading in different ways under-the-hood; the profiler merely traverses +the process tree to return all running threads associated with that process, +some of which may include active thread-monitoring daemons or transient +processes. + + +Visualizing Pipeline Resources +============================== +Nipype provides the ability to visualize the workflow execution based on the +runtimes and system resources each node takes. It does this using the log file +generated from the callback logger after workflow execution - as shown above. +The pandas_ Python package is required to use this feature. + +.. _pandas: http://pandas.pydata.org/ + +:: + + from nipype.pipeline.plugins.callback_log import log_nodes_cb + args_dict = {'n_procs' : 8, 'memory_gb' : 10, 'status_callback' : log_nodes_cb} + workflow.run(plugin='MultiProc', plugin_args=args_dict) + + # ...workflow finishes and writes callback log to '/home/user/run_stats.log' + + from nipype.utils.draw_gantt_chart import generate_gantt_chart + generate_gantt_chart('/home/user/run_stats.log', cores=8) + # ...creates gantt chart in '/home/user/run_stats.log.html' + +The ``generate_gantt_chart`` function will create an html file that can be viewed +in a browser. Below is an example of the gantt chart displayed in a web browser. +Note that when the cursor is hovered over any particular node bubble or resource +bubble, some additional information is shown in a pop-up. + + * - .. image:: images/gantt_chart.png + :width: 100 % diff --git a/nipype/interfaces/afni/base.py b/nipype/interfaces/afni/base.py index ffe9f230b5..431b024780 100644 --- a/nipype/interfaces/afni/base.py +++ b/nipype/interfaces/afni/base.py @@ -157,6 +157,10 @@ def __init__(self, **inputs): else: self._output_update() + # Update num threads estimate from OMP_NUM_THREADS env var + # Default to 1 if not set + os.environ['OMP_NUM_THREADS'] = str(self.num_threads) + def _output_update(self): """ i think? updates class private attribute based on instance input in fsl also updates ENVIRON variable....not valid in afni diff --git a/nipype/interfaces/base.py b/nipype/interfaces/base.py index 08a5e45f35..16b1c95293 100644 --- a/nipype/interfaces/base.py +++ b/nipype/interfaces/base.py @@ -754,6 +754,8 @@ def __init__(self, **inputs): raise Exception('No input_spec in class: %s' % self.__class__.__name__) self.inputs = self.input_spec(**inputs) + self.estimated_memory_gb = 1 + self.num_threads = 1 @classmethod def help(cls, returnhelp=False): @@ -1192,14 +1194,175 @@ def _read(self, drain): self._lastidx = len(self._rows) +# Get number of threads for process +def _get_num_threads(proc): + """Function to get the number of threads a process is using + NOTE: If + + Parameters + ---------- + proc : psutil.Process instance + the process to evaluate thead usage of + + Returns + ------- + num_threads : int + the number of threads that the process is using + """ + + # Import packages + import psutil + + # If process is running + if proc.status() == psutil.STATUS_RUNNING: + num_threads = proc.num_threads() + elif proc.num_threads() > 1: + tprocs = [psutil.Process(thr.id) for thr in proc.threads()] + alive_tprocs = [tproc for tproc in tprocs if tproc.status() == psutil.STATUS_RUNNING] + num_threads = len(alive_tprocs) + else: + num_threads = 1 + + # Try-block for errors + try: + child_threads = 0 + # Iterate through child processes and get number of their threads + for child in proc.children(recursive=True): + # Leaf process + if len(child.children()) == 0: + # If process is running, get its number of threads + if child.status() == psutil.STATUS_RUNNING: + child_thr = child.num_threads() + # If its not necessarily running, but still multi-threaded + elif child.num_threads() > 1: + # Cast each thread as a process and check for only running + tprocs = [psutil.Process(thr.id) for thr in child.threads()] + alive_tprocs = [tproc for tproc in tprocs if tproc.status() == psutil.STATUS_RUNNING] + child_thr = len(alive_tprocs) + # Otherwise, no threads are running + else: + child_thr = 0 + # Increment child threads + child_threads += child_thr + # Catch any NoSuchProcess errors + except psutil.NoSuchProcess: + pass + + # Number of threads is max between found active children and parent + num_threads = max(child_threads, num_threads) + + # Return number of threads found + return num_threads + + +# Get ram usage of process +def _get_ram_mb(pid, pyfunc=False): + """Function to get the RAM usage of a process and its children + + Parameters + ---------- + pid : integer + the PID of the process to get RAM usage of + pyfunc : boolean (optional); default=False + a flag to indicate if the process is a python function; + when Pythons are multithreaded via multiprocess or threading, + children functions include their own memory + parents. if this + is set, the parent memory will removed from children memories + + Reference: http://ftp.dev411.com/t/python/python-list/095thexx8g/multiprocessing-forking-memory-usage + + Returns + ------- + mem_mb : float + the memory RAM in MB utilized by the process PID + """ + + # Import packages + import psutil + + # Init variables + _MB = 1024.0**2 + + # Try block to protect against any dying processes in the interim + try: + # Init parent + parent = psutil.Process(pid) + # Get memory of parent + parent_mem = parent.memory_info().rss + mem_mb = parent_mem/_MB + + # Iterate through child processes + for child in parent.children(recursive=True): + child_mem = child.memory_info().rss + if pyfunc: + child_mem -= parent_mem + mem_mb += child_mem/_MB + + # Catch if process dies, return gracefully + except psutil.NoSuchProcess: + pass + + # Return memory + return mem_mb + + +# Get max resources used for process +def get_max_resources_used(pid, mem_mb, num_threads, pyfunc=False): + """Function to get the RAM and threads usage of a process + + Paramters + --------- + pid : integer + the process ID of process to profile + mem_mb : float + the high memory watermark so far during process execution (in MB) + num_threads: int + the high thread watermark so far during process execution + + Returns + ------- + mem_mb : float + the new high memory watermark of process (MB) + num_threads : float + the new high thread watermark of process + """ + + # Import packages + import psutil + + try: + mem_mb = max(mem_mb, _get_ram_mb(pid, pyfunc=pyfunc)) + num_threads = max(num_threads, _get_num_threads(psutil.Process(pid))) + except Exception as exc: + iflogger.info('Could not get resources used by process. Error: %s'\ + % exc) + + # Return resources + return mem_mb, num_threads + + def run_command(runtime, output=None, timeout=0.01, redirect_x=False): """Run a command, read stdout and stderr, prefix with timestamp. The returned runtime contains a merged stdout+stderr log with timestamps """ - PIPE = subprocess.PIPE + # Init logger + logger = logging.getLogger('workflow') + + # Default to profiling the runtime + try: + import psutil + runtime_profile = True + except ImportError as exc: + logger.info('Unable to import packages needed for runtime profiling. '\ + 'Turning off runtime profiler. Reason: %s' % exc) + runtime_profile = False + + # Init variables + PIPE = subprocess.PIPE cmdline = runtime.cmdline + if redirect_x: exist_xvfb, _ = _exists_in_path('xvfb-run', runtime.environ) if not exist_xvfb: @@ -1231,6 +1394,12 @@ def run_command(runtime, output=None, timeout=0.01, redirect_x=False): result = {} errfile = os.path.join(runtime.cwd, 'stderr.nipype') outfile = os.path.join(runtime.cwd, 'stdout.nipype') + + # Init variables for memory profiling + mem_mb = 0 + num_threads = 1 + interval = .5 + if output == 'stream': streams = [Stream('stdout', proc.stdout), Stream('stderr', proc.stderr)] @@ -1246,10 +1415,13 @@ def _process(drain=0): else: for stream in res[0]: stream.read(drain) - while proc.returncode is None: + if runtime_profile: + mem_mb, num_threads = \ + get_max_resources_used(proc.pid, mem_mb, num_threads) proc.poll() _process() + time.sleep(interval) _process(drain=1) # collect results, merge and return @@ -1261,7 +1433,14 @@ def _process(drain=0): result[stream._name] = [r[2] for r in rows] temp.sort() result['merged'] = [r[1] for r in temp] + if output == 'allatonce': + if runtime_profile: + while proc.returncode is None: + mem_mb, num_threads = \ + get_max_resources_used(proc.pid, mem_mb, num_threads) + proc.poll() + time.sleep(interval) stdout, stderr = proc.communicate() stdout = stdout.decode(default_encoding) stderr = stderr.decode(default_encoding) @@ -1269,6 +1448,12 @@ def _process(drain=0): result['stderr'] = stderr.split('\n') result['merged'] = '' if output == 'file': + if runtime_profile: + while proc.returncode is None: + mem_mb, num_threads = \ + get_max_resources_used(proc.pid, mem_mb, num_threads) + proc.poll() + time.sleep(interval) ret_code = proc.wait() stderr.flush() stdout.flush() @@ -1276,10 +1461,19 @@ def _process(drain=0): result['stderr'] = [line.decode(default_encoding).strip() for line in open(errfile, 'rb').readlines()] result['merged'] = '' if output == 'none': + if runtime_profile: + while proc.returncode is None: + mem_mb, num_threads = \ + get_max_resources_used(proc.pid, mem_mb, num_threads) + proc.poll() + time.sleep(interval) proc.communicate() result['stdout'] = [] result['stderr'] = [] result['merged'] = '' + + setattr(runtime, 'runtime_memory_gb', mem_mb/1024.0) + setattr(runtime, 'runtime_threads', num_threads) runtime.stderr = '\n'.join(result['stderr']) runtime.stdout = '\n'.join(result['stdout']) runtime.merged = result['merged'] diff --git a/nipype/interfaces/io.py b/nipype/interfaces/io.py index 7bc0ff358c..0035971f31 100644 --- a/nipype/interfaces/io.py +++ b/nipype/interfaces/io.py @@ -211,7 +211,7 @@ class DataSinkInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec): encrypt_bucket_keys = traits.Bool(desc='Flag indicating whether to use S3 '\ 'server-side AES-256 encryption') # Set this if user wishes to override the bucket with their own - bucket = traits.Str(desc='Boto3 S3 bucket for manual override of bucket') + bucket = traits.Any(desc='Boto3 S3 bucket for manual override of bucket') # Set this if user wishes to have local copy of files as well local_copy = traits.Str(desc='Copy files locally as well as to S3 bucket') diff --git a/nipype/interfaces/tests/test_runtime_profiler.py b/nipype/interfaces/tests/test_runtime_profiler.py new file mode 100644 index 0000000000..b9adfe5c19 --- /dev/null +++ b/nipype/interfaces/tests/test_runtime_profiler.py @@ -0,0 +1,442 @@ +# test_runtime_profiler.py +# +# Author: Daniel Clark, 2016 + +''' +Module to unit test the runtime_profiler in nipype +''' + +# Import packages +import unittest +from nipype.interfaces.base import traits, CommandLine, CommandLineInputSpec + +try: + import psutil + run_profiler = True + skip_profile_msg = 'Run profiler tests' +except ImportError as exc: + skip_profile_msg = 'Missing python packages for runtime profiling, skipping...\n'\ + 'Error: %s' % exc + run_profiler = False + +# UseResources inputspec +class UseResourcesInputSpec(CommandLineInputSpec): + ''' + use_resources cmd interface inputspec + ''' + + # Init attributes + num_gb = traits.Float(desc='Number of GB of RAM to use', + argstr='-g %f') + num_threads = traits.Int(desc='Number of threads to use', + argstr='-p %d') + + +# UseResources interface +class UseResources(CommandLine): + ''' + use_resources cmd interface + ''' + + # Import packages + import os + + # Init attributes + input_spec = UseResourcesInputSpec + + # Get path of executable + exec_dir = os.path.dirname(os.path.realpath(__file__)) + exec_path = os.path.join(exec_dir, 'use_resources') + + # Init cmd + _cmd = exec_path + + +# Spin multiple threads +def use_resources(num_threads, num_gb): + ''' + Function to execute multiple use_gb_ram functions in parallel + ''' + + # Function to occupy GB of memory + def _use_gb_ram(num_gb): + ''' + Function to consume GB of memory + ''' + + # Eat 1 GB of memory for 1 second + gb_str = ' ' * int(num_gb*1024.0**3) + + # Spin CPU + ctr = 0 + while ctr < 30e6: + ctr += 1 + + # Clear memory + del ctr + del gb_str + + # Import packages + from multiprocessing import Process + from threading import Thread + + # Init variables + num_gb = float(num_gb) + + # Build thread list + thread_list = [] + for idx in range(num_threads): + thread = Thread(target=_use_gb_ram, args=(num_gb/num_threads,), + name=str(idx)) + thread_list.append(thread) + + # Run multi-threaded + print('Using %.3f GB of memory over %d sub-threads...' % \ + (num_gb, num_threads)) + for idx, thread in enumerate(thread_list): + thread.start() + + for thread in thread_list: + thread.join() + + +# Test case for the run function +class RuntimeProfilerTestCase(unittest.TestCase): + ''' + This class is a test case for the runtime profiler + + Inherits + -------- + unittest.TestCase class + + Attributes (class): + ------------------ + see unittest.TestCase documentation + + Attributes (instance): + ---------------------- + ''' + + # setUp method for the necessary arguments to run cpac_pipeline.run + def setUp(self): + ''' + Method to instantiate TestCase + + Parameters + ---------- + self : RuntimeProfileTestCase + a unittest.TestCase-inherited class + ''' + + # Init parameters + # Input RAM GB to occupy + self.num_gb = 1.0 + # Input number of sub-threads (not including parent threads) + self.num_threads = 2 + # Acceptable percent error for memory profiled against input + self.mem_err_gb = 0.25 + + # ! Only used for benchmarking the profiler over a range of + # ! RAM usage and number of threads + # ! Requires a LOT of RAM to be tested + def _collect_range_runtime_stats(self, num_threads): + ''' + Function to collect a range of runtime stats + ''' + + # Import packages + import json + import numpy as np + import pandas as pd + + # Init variables + ram_gb_range = 10.0 + ram_gb_step = 0.25 + dict_list = [] + + # Iterate through all combos + for num_gb in np.arange(0.25, ram_gb_range+ram_gb_step, ram_gb_step): + # Cmd-level + cmd_start_str, cmd_fin_str = self._run_cmdline_workflow(num_gb, num_threads) + cmd_start_ts = json.loads(cmd_start_str)['start'] + cmd_node_stats = json.loads(cmd_fin_str) + cmd_runtime_threads = int(cmd_node_stats['runtime_threads']) + cmd_runtime_gb = float(cmd_node_stats['runtime_memory_gb']) + cmd_finish_ts = cmd_node_stats['finish'] + + # Func-level + func_start_str, func_fin_str = self._run_function_workflow(num_gb, num_threads) + func_start_ts = json.loads(func_start_str)['start'] + func_node_stats = json.loads(func_fin_str) + func_runtime_threads = int(func_node_stats['runtime_threads']) + func_runtime_gb = float(func_node_stats['runtime_memory_gb']) + func_finish_ts = func_node_stats['finish'] + + # Calc errors + cmd_threads_err = cmd_runtime_threads - num_threads + cmd_gb_err = cmd_runtime_gb - num_gb + func_threads_err = func_runtime_threads - num_threads + func_gb_err = func_runtime_gb - num_gb + + # Node dictionary + results_dict = {'input_threads' : num_threads, + 'input_gb' : num_gb, + 'cmd_runtime_threads' : cmd_runtime_threads, + 'cmd_runtime_gb' : cmd_runtime_gb, + 'func_runtime_threads' : func_runtime_threads, + 'func_runtime_gb' : func_runtime_gb, + 'cmd_threads_err' : cmd_threads_err, + 'cmd_gb_err' : cmd_gb_err, + 'func_threads_err' : func_threads_err, + 'func_gb_err' : func_gb_err, + 'cmd_start_ts' : cmd_start_ts, + 'cmd_finish_ts' : cmd_finish_ts, + 'func_start_ts' : func_start_ts, + 'func_finish_ts' : func_finish_ts} + # Append to list + dict_list.append(results_dict) + + # Create dataframe + runtime_results_df = pd.DataFrame(dict_list) + + # Return dataframe + return runtime_results_df + + # Test node + def _run_cmdline_workflow(self, num_gb, num_threads): + ''' + Function to run the use_resources cmdline script in a nipype workflow + and return the runtime stats recorded by the profiler + + Parameters + ---------- + self : RuntimeProfileTestCase + a unittest.TestCase-inherited class + + Returns + ------- + finish_str : string + a json-compatible dictionary string containing the runtime + statistics of the nipype node that used system resources + ''' + + # Import packages + import logging + import os + import shutil + import tempfile + + import nipype.pipeline.engine as pe + import nipype.interfaces.utility as util + from nipype.pipeline.plugins.callback_log import log_nodes_cb + + # Init variables + base_dir = tempfile.mkdtemp() + log_file = os.path.join(base_dir, 'callback.log') + + # Init logger + logger = logging.getLogger('callback') + logger.setLevel(logging.DEBUG) + handler = logging.FileHandler(log_file) + logger.addHandler(handler) + + # Declare workflow + wf = pe.Workflow(name='test_runtime_prof_cmd') + wf.base_dir = base_dir + + # Input node + input_node = pe.Node(util.IdentityInterface(fields=['num_gb', + 'num_threads']), + name='input_node') + input_node.inputs.num_gb = num_gb + input_node.inputs.num_threads = num_threads + + # Resources used node + resource_node = pe.Node(UseResources(), name='resource_node') + resource_node.interface.estimated_memory_gb = num_gb + resource_node.interface.num_threads = num_threads + + # Connect workflow + wf.connect(input_node, 'num_gb', resource_node, 'num_gb') + wf.connect(input_node, 'num_threads', resource_node, 'num_threads') + + # Run workflow + plugin_args = {'n_procs' : num_threads, + 'memory' : num_gb, + 'status_callback' : log_nodes_cb} + wf.run(plugin='MultiProc', plugin_args=plugin_args) + + # Get runtime stats from log file + start_str = open(log_file, 'r').readlines()[0].rstrip('\n') + finish_str = open(log_file, 'r').readlines()[1].rstrip('\n') + + # Delete wf base dir + shutil.rmtree(base_dir) + + # Return runtime stats + return start_str, finish_str + + # Test node + def _run_function_workflow(self, num_gb, num_threads): + ''' + Function to run the use_resources() function in a nipype workflow + and return the runtime stats recorded by the profiler + + Parameters + ---------- + self : RuntimeProfileTestCase + a unittest.TestCase-inherited class + + Returns + ------- + finish_str : string + a json-compatible dictionary string containing the runtime + statistics of the nipype node that used system resources + ''' + + # Import packages + import logging + import os + import shutil + import tempfile + + import nipype.pipeline.engine as pe + import nipype.interfaces.utility as util + from nipype.pipeline.plugins.callback_log import log_nodes_cb + + # Init variables + base_dir = tempfile.mkdtemp() + log_file = os.path.join(base_dir, 'callback.log') + + # Init logger + logger = logging.getLogger('callback') + logger.setLevel(logging.DEBUG) + handler = logging.FileHandler(log_file) + logger.addHandler(handler) + + # Declare workflow + wf = pe.Workflow(name='test_runtime_prof_func') + wf.base_dir = base_dir + + # Input node + input_node = pe.Node(util.IdentityInterface(fields=['num_gb', + 'num_threads']), + name='input_node') + input_node.inputs.num_gb = num_gb + input_node.inputs.num_threads = num_threads + + # Resources used node + resource_node = pe.Node(util.Function(input_names=['num_threads', + 'num_gb'], + output_names=[], + function=use_resources), + name='resource_node') + resource_node.interface.estimated_memory_gb = num_gb + resource_node.interface.num_threads = num_threads + + # Connect workflow + wf.connect(input_node, 'num_gb', resource_node, 'num_gb') + wf.connect(input_node, 'num_threads', resource_node, 'num_threads') + + # Run workflow + plugin_args = {'n_procs' : num_threads, + 'memory' : num_gb, + 'status_callback' : log_nodes_cb} + wf.run(plugin='MultiProc', plugin_args=plugin_args) + + # Get runtime stats from log file + start_str = open(log_file, 'r').readlines()[0].rstrip('\n') + finish_str = open(log_file, 'r').readlines()[1].rstrip('\n') + + # Delete wf base dir + shutil.rmtree(base_dir) + + # Return runtime stats + return start_str, finish_str + + # Test resources were used as expected in cmdline interface + @unittest.skipIf(run_profiler == False, skip_profile_msg) + def test_cmdline_profiling(self): + ''' + Test runtime profiler correctly records workflow RAM/CPUs consumption + from a cmdline function + ''' + + # Import packages + import json + import numpy as np + + # Init variables + num_gb = self.num_gb + num_threads = self.num_threads + + # Run workflow and get stats + start_str, finish_str = self._run_cmdline_workflow(num_gb, num_threads) + # Get runtime stats as dictionary + node_stats = json.loads(finish_str) + + # Read out runtime stats + runtime_gb = float(node_stats['runtime_memory_gb']) + runtime_threads = int(node_stats['runtime_threads']) + + # Get margin of error for RAM GB + allowed_gb_err = self.mem_err_gb + runtime_gb_err = np.abs(runtime_gb-num_gb) + # + expected_runtime_threads = num_threads + + # Error message formatting + mem_err = 'Input memory: %f is not within %.3f GB of runtime '\ + 'memory: %f' % (num_gb, self.mem_err_gb, runtime_gb) + threads_err = 'Input threads: %d is not equal to runtime threads: %d' \ + % (expected_runtime_threads, runtime_threads) + + # Assert runtime stats are what was input + self.assertLessEqual(runtime_gb_err, allowed_gb_err, msg=mem_err) + self.assertEqual(expected_runtime_threads, runtime_threads, msg=threads_err) + + # Test resources were used as expected + @unittest.skipIf(run_profiler == False, skip_profile_msg) + def test_function_profiling(self): + ''' + Test runtime profiler correctly records workflow RAM/CPUs consumption + from a python function + ''' + + # Import packages + import json + import numpy as np + + # Init variables + num_gb = self.num_gb + num_threads = self.num_threads + + # Run workflow and get stats + start_str, finish_str = self._run_function_workflow(num_gb, num_threads) + # Get runtime stats as dictionary + node_stats = json.loads(finish_str) + + # Read out runtime stats + runtime_gb = float(node_stats['runtime_memory_gb']) + runtime_threads = int(node_stats['runtime_threads']) + + # Get margin of error for RAM GB + allowed_gb_err = self.mem_err_gb + runtime_gb_err = np.abs(runtime_gb-num_gb) + # + expected_runtime_threads = num_threads + + # Error message formatting + mem_err = 'Input memory: %f is not within %.3f GB of runtime '\ + 'memory: %f' % (num_gb, self.mem_err_gb, runtime_gb) + threads_err = 'Input threads: %d is not equal to runtime threads: %d' \ + % (expected_runtime_threads, runtime_threads) + + # Assert runtime stats are what was input + self.assertLessEqual(runtime_gb_err, allowed_gb_err, msg=mem_err) + self.assertEqual(expected_runtime_threads, runtime_threads, msg=threads_err) + + +# Command-line run-able unittest module +if __name__ == '__main__': + unittest.main() diff --git a/nipype/interfaces/tests/use_resources b/nipype/interfaces/tests/use_resources new file mode 100755 index 0000000000..06e2d3e906 --- /dev/null +++ b/nipype/interfaces/tests/use_resources @@ -0,0 +1,68 @@ +#!/usr/bin/env python +# +# use_resources + +''' +Python script to use a certain amount of RAM on disk and number of +threads + +Usage: + use_resources -g -p +''' + +# Function to occupy GB of memory +def use_gb_ram(num_gb): + ''' + Function to consume GB of memory + ''' + + # Eat 1 GB of memory for 1 second + gb_str = ' ' * int(num_gb*1024.0**3) + + # Spin CPU + ctr = 0 + while ctr < 30e6: + ctr+= 1 + + # Clear memory + del ctr + del gb_str + + +# Make main executable +if __name__ == '__main__': + + # Import packages + import argparse + from threading import Thread + from multiprocessing import Process + + # Init argparser + parser = argparse.ArgumentParser(description=__doc__) + + # Add arguments + parser.add_argument('-g', '--num_gb', nargs=1, required=True, + help='Number of GB RAM to use, can be float or int') + parser.add_argument('-p', '--num_threads', nargs=1, required=True, + help='Number of threads to run in parallel') + + # Parse args + args = parser.parse_args() + + # Init variables + num_gb = float(args.num_gb[0]) + num_threads = int(args.num_threads[0]) + + # Build thread list + thread_list = [] + for idx in range(num_threads): + thread_list.append(Thread(target=use_gb_ram, args=(num_gb/num_threads,))) + + # Run multi-threaded + print('Using %.3f GB of memory over %d sub-threads...' % \ + (num_gb, num_threads)) + for thread in thread_list: + thread.start() + + for thread in thread_list: + thread.join() diff --git a/nipype/interfaces/utility.py b/nipype/interfaces/utility.py index c93081409c..262d01eee9 100644 --- a/nipype/interfaces/utility.py +++ b/nipype/interfaces/utility.py @@ -435,16 +435,71 @@ def _add_output_traits(self, base): return base def _run_interface(self, runtime): + # Get workflow logger for runtime profile error reporting + from nipype import logging + logger = logging.getLogger('workflow') + + # Create function handle function_handle = create_function_from_source(self.inputs.function_str, self.imports) + # Wrapper for running function handle in multiprocessing.Process + # Can catch exceptions and report output via multiprocessing.Queue + def _function_handle_wrapper(queue, **kwargs): + try: + out = function_handle(**kwargs) + queue.put(out) + except Exception as exc: + queue.put(exc) + + # Get function args args = {} for name in self._input_names: value = getattr(self.inputs, name) if isdefined(value): args[name] = value - out = function_handle(**args) + # Runtime profiler on if dependecies available + try: + import psutil + from nipype.interfaces.base import get_max_resources_used + import multiprocessing + runtime_profile = True + except ImportError as exc: + logger.info('Unable to import packages needed for runtime profiling. '\ + 'Turning off runtime profiler. Reason: %s' % exc) + runtime_profile = False + + # Profile resources if set + #runtime_profile=False + if runtime_profile: + # Init communication queue and proc objs + queue = multiprocessing.Queue() + proc = multiprocessing.Process(target=_function_handle_wrapper, + args=(queue,), kwargs=args) + + # Init memory and threads before profiling + mem_mb = 0 + num_threads = 0 + + # Start process and profile while it's alive + proc.start() + while proc.is_alive(): + mem_mb, num_threads = \ + get_max_resources_used(proc.pid, mem_mb, num_threads, + pyfunc=True) + + # Get result from process queue + out = queue.get() + # If it is an exception, raise it + if isinstance(out, Exception): + raise out + + # Function ran successfully, populate runtime stats + setattr(runtime, 'runtime_memory_gb', mem_mb/1024.0) + setattr(runtime, 'runtime_threads', num_threads) + else: + out = function_handle(**args) if len(self._output_names) == 1: self._out[self._output_names[0]] = out diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index f2fef3cdae..531b745747 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -737,15 +737,20 @@ def write_report(self, report_type=None, cwd=None): fp.close() return fp.writelines(write_rst_header('Runtime info', level=1)) + # Init rst dictionary of runtime stats + rst_dict = {'hostname' : self.result.runtime.hostname, + 'duration' : self.result.runtime.duration} + # Try and insert memory/threads usage if available + try: + rst_dict['runtime_memory_gb'] = self.result.runtime.runtime_memory_gb + rst_dict['runtime_threads'] = self.result.runtime.runtime_threads + except AttributeError: + logger.info('Runtime memory and threads stats unavailable') if hasattr(self.result.runtime, 'cmdline'): - fp.writelines(write_rst_dict( - {'hostname': self.result.runtime.hostname, - 'duration': self.result.runtime.duration, - 'command': self.result.runtime.cmdline})) + rst_dict['command'] = self.result.runtime.cmdline + fp.writelines(write_rst_dict(rst_dict)) else: - fp.writelines(write_rst_dict( - {'hostname': self.result.runtime.hostname, - 'duration': self.result.runtime.duration})) + fp.writelines(write_rst_dict(rst_dict)) if hasattr(self.result.runtime, 'merged'): fp.writelines(write_rst_header('Terminal output', level=2)) fp.writelines(write_rst_list(self.result.runtime.merged)) diff --git a/nipype/pipeline/plugins/__init__.py b/nipype/pipeline/plugins/__init__.py index 26d1577f55..0bf1a8d2f5 100644 --- a/nipype/pipeline/plugins/__init__.py +++ b/nipype/pipeline/plugins/__init__.py @@ -17,3 +17,6 @@ from .lsf import LSFPlugin from .slurm import SLURMPlugin from .slurmgraph import SLURMGraphPlugin + +from .callback_log import log_nodes_cb +from . import semaphore_singleton diff --git a/nipype/pipeline/plugins/base.py b/nipype/pipeline/plugins/base.py index 3f1f216ac6..9e73f1b048 100644 --- a/nipype/pipeline/plugins/base.py +++ b/nipype/pipeline/plugins/base.py @@ -235,6 +235,7 @@ def run(self, graph, config, updatehash=False): notrun = [] while np.any(self.proc_done == False) | \ np.any(self.proc_pending == True): + toappend = [] # trigger callbacks for any pending results while self.pending_tasks: @@ -265,10 +266,16 @@ def run(self, graph, config, updatehash=False): graph=graph) else: logger.debug('Not submitting') - sleep(float(self._config['execution']['poll_sleep_duration'])) + self._wait() + self._remove_node_dirs() report_nodes_not_run(notrun) + + + def _wait(self): + sleep(float(self._config['execution']['poll_sleep_duration'])) + def _get_result(self, taskid): raise NotImplementedError diff --git a/nipype/pipeline/plugins/callback_log.py b/nipype/pipeline/plugins/callback_log.py new file mode 100644 index 0000000000..14287bda07 --- /dev/null +++ b/nipype/pipeline/plugins/callback_log.py @@ -0,0 +1,65 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +"""Callback logger for recording workflow and node run stats +""" + + +# Log node stats function +def log_nodes_cb(node, status): + """Function to record node run statistics to a log file as json + dictionaries + + Parameters + ---------- + node : nipype.pipeline.engine.Node + the node being logged + status : string + acceptable values are 'start', 'end'; otherwise it is + considered and error + + Returns + ------- + None + this function does not return any values, it logs the node + status info to the callback logger + """ + + # Import packages + import datetime + import logging + import json + + # Check runtime profile stats + if node.result is not None: + try: + runtime = node.result.runtime + runtime_memory_gb = runtime.runtime_memory_gb + runtime_threads = runtime.runtime_threads + except AttributeError: + runtime_memory_gb = runtime_threads = 'Unknown' + else: + runtime_memory_gb = runtime_threads = 'N/A' + + # Init variables + logger = logging.getLogger('callback') + status_dict = {'name' : node.name, + 'id' : node._id, + 'estimated_memory_gb' : node._interface.estimated_memory_gb, + 'num_threads' : node._interface.num_threads} + + # Check status and write to log + # Start + if status == 'start': + status_dict['start'] = str(datetime.datetime.now()) + # End + elif status == 'end': + status_dict['finish'] = str(datetime.datetime.now()) + status_dict['runtime_threads'] = runtime_threads + status_dict['runtime_memory_gb'] = runtime_memory_gb + # Other + else: + status_dict['finish'] = str(datetime.datetime.now()) + status_dict['error'] = True + + # Dump string to log + logger.debug(json.dumps(status_dict)) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 861e2cc507..d2a7f7f9b9 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -6,21 +6,53 @@ http://stackoverflow.com/a/8963618/1183453 """ +# Import packages from multiprocessing import Process, Pool, cpu_count, pool from traceback import format_exception +import os import sys +import numpy as np +from copy import deepcopy +from ..engine import MapNode +from ...utils.misc import str2bool +from ... import logging +from nipype.pipeline.plugins import semaphore_singleton from .base import (DistributedPluginBase, report_crash) +# Init logger +logger = logging.getLogger('workflow') +# Run node def run_node(node, updatehash): + """Function to execute node.run(), catch and log any errors and + return the result dictionary + + Parameters + ---------- + node : nipype Node instance + the node to run + updatehash : boolean + flag for updating hash + + Returns + ------- + result : dictionary + dictionary containing the node runtime results and stats + """ + + # Init variables result = dict(result=None, traceback=None) + + # Try and execute the node via node.run() try: result['result'] = node.run(updatehash=updatehash) except: etype, eval, etr = sys.exc_info() result['traceback'] = format_exception(etype, eval, etr) result['result'] = node.result + + # Return the result dictionary return result @@ -42,33 +74,90 @@ class NonDaemonPool(pool.Pool): Process = NonDaemonProcess +def release_lock(args): + semaphore_singleton.semaphore.release() + + +# Get total system RAM +def get_system_total_memory_gb(): + """Function to get the total RAM of the running system in GB + """ + + # Import packages + import os + import sys + + # Get memory + if 'linux' in sys.platform: + with open('/proc/meminfo', 'r') as f_in: + meminfo_lines = f_in.readlines() + mem_total_line = [line for line in meminfo_lines \ + if 'MemTotal' in line][0] + mem_total = float(mem_total_line.split()[1]) + memory_gb = mem_total/(1024.0**2) + elif 'darwin' in sys.platform: + mem_str = os.popen('sysctl hw.memsize').read().strip().split(' ')[-1] + memory_gb = float(mem_str)/(1024.0**3) + else: + err_msg = 'System platform: %s is not supported' + raise Exception(err_msg) + + # Return memory + return memory_gb + + class MultiProcPlugin(DistributedPluginBase): - """Execute workflow with multiprocessing + """Execute workflow with multiprocessing, not sending more jobs at once + than the system can support. The plugin_args input to run can be used to control the multiprocessing - execution. Currently supported options are: + execution and defining the maximum amount of memory and threads that + should be used. When those parameters are not specified, + the number of threads and memory of the system is used. + + System consuming nodes should be tagged: + memory_consuming_node.interface.estimated_memory_gb = 8 + thread_consuming_node.interface.num_threads = 16 + + The default number of threads and memory for a node is 1. + + Currently supported options are: - - n_procs : number of processes to use - non_daemon : boolean flag to execute as non-daemon processes + - n_procs: maximum number of threads to be executed in parallel + - memory_gb: maximum memory (in GB) that can be used at once. """ def __init__(self, plugin_args=None): + # Init variables and instance attributes super(MultiProcPlugin, self).__init__(plugin_args=plugin_args) self._taskresult = {} self._taskid = 0 non_daemon = True - n_procs = cpu_count() - if plugin_args: - if 'n_procs' in plugin_args: - n_procs = plugin_args['n_procs'] - if 'non_daemon' in plugin_args: + self.plugin_args = plugin_args + self.processors = cpu_count() + self.memory_gb = get_system_total_memory_gb()*0.9 # 90% of system memory + + # Check plugin args + if self.plugin_args: + if 'non_daemon' in self.plugin_args: non_daemon = plugin_args['non_daemon'] + if 'n_procs' in self.plugin_args: + self.processors = self.plugin_args['n_procs'] + if 'memory_gb' in self.plugin_args: + self.memory_gb = self.plugin_args['memory_gb'] + # Instantiate different thread pools for non-daemon processes if non_daemon: # run the execution using the non-daemon pool subclass - self.pool = NonDaemonPool(processes=n_procs) + self.pool = NonDaemonPool(processes=self.processors) else: - self.pool = Pool(processes=n_procs) + self.pool = Pool(processes=self.processors) + + def _wait(self): + if len(self.pending_tasks) > 0: + semaphore_singleton.semaphore.acquire() + semaphore_singleton.semaphore.release() def _get_result(self, taskid): if taskid not in self._taskresult: @@ -77,18 +166,6 @@ def _get_result(self, taskid): return None return self._taskresult[taskid].get() - def _submit_job(self, node, updatehash=False): - self._taskid += 1 - try: - if node.inputs.terminal_output == 'stream': - node.inputs.terminal_output = 'allatonce' - except: - pass - self._taskresult[self._taskid] = self.pool.apply_async(run_node, - (node, - updatehash,)) - return self._taskid - def _report_crash(self, node, result=None): if result and result['traceback']: node._result = result['result'] @@ -100,3 +177,132 @@ def _report_crash(self, node, result=None): def _clear_task(self, taskid): del self._taskresult[taskid] + + def _submit_job(self, node, updatehash=False): + self._taskid += 1 + if hasattr(node.inputs, 'terminal_output'): + if node.inputs.terminal_output == 'stream': + node.inputs.terminal_output = 'allatonce' + + self._taskresult[self._taskid] = \ + self.pool.apply_async(run_node, + (node, updatehash), + callback=release_lock) + return self._taskid + + def _send_procs_to_workers(self, updatehash=False, graph=None): + """ Sends jobs to workers when system resources are available. + Check memory (gb) and cores usage before running jobs. + """ + executing_now = [] + + # Check to see if a job is available + jobids = np.flatnonzero((self.proc_pending == True) & \ + (self.depidx.sum(axis=0) == 0).__array__()) + + # Check available system resources by summing all threads and memory used + busy_memory_gb = 0 + busy_processors = 0 + for jobid in jobids: + busy_memory_gb += self.procs[jobid]._interface.estimated_memory_gb + busy_processors += self.procs[jobid]._interface.num_threads + + free_memory_gb = self.memory_gb - busy_memory_gb + free_processors = self.processors - busy_processors + + # Check all jobs without dependency not run + jobids = np.flatnonzero((self.proc_done == False) & \ + (self.depidx.sum(axis=0) == 0).__array__()) + + # Sort jobs ready to run first by memory and then by number of threads + # The most resource consuming jobs run first + jobids = sorted(jobids, + key=lambda item: (self.procs[item]._interface.estimated_memory_gb, + self.procs[item]._interface.num_threads)) + + logger.debug('Free memory (GB): %d, Free processors: %d', + free_memory_gb, free_processors) + + # While have enough memory and processors for first job + # Submit first job on the list + for jobid in jobids: + logger.debug('Next Job: %d, memory (GB): %d, threads: %d' \ + % (jobid, self.procs[jobid]._interface.estimated_memory_gb, + self.procs[jobid]._interface.num_threads)) + + if self.procs[jobid]._interface.estimated_memory_gb <= free_memory_gb and \ + self.procs[jobid]._interface.num_threads <= free_processors: + logger.info('Executing: %s ID: %d' %(self.procs[jobid]._id, jobid)) + executing_now.append(self.procs[jobid]) + + if isinstance(self.procs[jobid], MapNode): + try: + num_subnodes = self.procs[jobid].num_subnodes() + except Exception: + etype, eval, etr = sys.exc_info() + traceback = format_exception(etype, eval, etr) + report_crash(self.procs[jobid], traceback=traceback) + self._clean_queue(jobid, graph) + self.proc_pending[jobid] = False + continue + if num_subnodes > 1: + submit = self._submit_mapnode(jobid) + if not submit: + continue + + # change job status in appropriate queues + self.proc_done[jobid] = True + self.proc_pending[jobid] = True + + free_memory_gb -= self.procs[jobid]._interface.estimated_memory_gb + free_processors -= self.procs[jobid]._interface.num_threads + + # Send job to task manager and add to pending tasks + if self._status_callback: + self._status_callback(self.procs[jobid], 'start') + if str2bool(self.procs[jobid].config['execution']['local_hash_check']): + logger.debug('checking hash locally') + try: + hash_exists, _, _, _ = self.procs[ + jobid].hash_exists() + logger.debug('Hash exists %s' % str(hash_exists)) + if (hash_exists and (self.procs[jobid].overwrite == False or \ + (self.procs[jobid].overwrite == None and \ + not self.procs[jobid]._interface.always_run))): + self._task_finished_cb(jobid) + self._remove_node_dirs() + continue + except Exception: + etype, eval, etr = sys.exc_info() + traceback = format_exception(etype, eval, etr) + report_crash(self.procs[jobid], traceback=traceback) + self._clean_queue(jobid, graph) + self.proc_pending[jobid] = False + continue + logger.debug('Finished checking hash') + + if self.procs[jobid].run_without_submitting: + logger.debug('Running node %s on master thread' \ + % self.procs[jobid]) + try: + self.procs[jobid].run() + except Exception: + etype, eval, etr = sys.exc_info() + traceback = format_exception(etype, eval, etr) + report_crash(self.procs[jobid], traceback=traceback) + self._task_finished_cb(jobid) + self._remove_node_dirs() + + else: + logger.debug('submitting %s' % str(jobid)) + tid = self._submit_job(deepcopy(self.procs[jobid]), + updatehash=updatehash) + if tid is None: + self.proc_done[jobid] = False + self.proc_pending[jobid] = False + else: + self.pending_tasks.insert(0, (tid, jobid)) + else: + break + + logger.debug('No jobs waiting to execute') diff --git a/nipype/pipeline/plugins/semaphore_singleton.py b/nipype/pipeline/plugins/semaphore_singleton.py new file mode 100644 index 0000000000..99c7752b82 --- /dev/null +++ b/nipype/pipeline/plugins/semaphore_singleton.py @@ -0,0 +1,2 @@ +import threading +semaphore = threading.Semaphore(1) diff --git a/nipype/pipeline/plugins/tests/test_callback.py b/nipype/pipeline/plugins/tests/test_callback.py index db02bc889b..0769781e8a 100644 --- a/nipype/pipeline/plugins/tests/test_callback.py +++ b/nipype/pipeline/plugins/tests/test_callback.py @@ -26,7 +26,7 @@ class Status(object): def __init__(self): self.statuses = [] - def callback(self, node, status): + def callback(self, node, status, result=None): self.statuses.append((node, status)) @@ -93,7 +93,6 @@ def test_callback_multiproc_exception(): name='f_node') wf.add_nodes([f_node]) wf.config['execution']['crashdump_dir'] = wf.base_dir - wf.config['execution']['poll_sleep_duration'] = 2 try: wf.run(plugin='MultiProc', plugin_args={'status_callback': so.callback}) diff --git a/nipype/pipeline/plugins/tests/test_multiproc.py b/nipype/pipeline/plugins/tests/test_multiproc.py index efa9ec4161..82574d07fe 100644 --- a/nipype/pipeline/plugins/tests/test_multiproc.py +++ b/nipype/pipeline/plugins/tests/test_multiproc.py @@ -1,11 +1,15 @@ +import logging import os -import nipype.interfaces.base as nib from tempfile import mkdtemp from shutil import rmtree +from multiprocessing import cpu_count +import nipype.interfaces.base as nib +from nipype.utils import draw_gantt_chart from nipype.testing import assert_equal import nipype.pipeline.engine as pe - +from nipype.pipeline.plugins.callback_log import log_nodes_cb +from nipype.pipeline.plugins.multiproc import get_system_total_memory_gb class InputSpec(nib.TraitedSpec): input1 = nib.traits.Int(desc='a random int') @@ -51,3 +55,182 @@ def test_run_multiproc(): yield assert_equal, result, [1, 1] os.chdir(cur_dir) rmtree(temp_dir) + + +class InputSpecSingleNode(nib.TraitedSpec): + input1 = nib.traits.Int(desc='a random int') + input2 = nib.traits.Int(desc='a random int') + + +class OutputSpecSingleNode(nib.TraitedSpec): + output1 = nib.traits.Int(desc='a random int') + + +class TestInterfaceSingleNode(nib.BaseInterface): + input_spec = InputSpecSingleNode + output_spec = OutputSpecSingleNode + + def _run_interface(self, runtime): + runtime.returncode = 0 + return runtime + + def _list_outputs(self): + outputs = self._outputs().get() + outputs['output1'] = self.inputs.input1 + return outputs + + +def find_metrics(nodes, last_node): + """ + """ + + # Import packages + from dateutil.parser import parse + import datetime + + start = nodes[0]['start'] + total_duration = int((last_node['finish'] - start).total_seconds()) + + total_memory = [] + total_threads = [] + for i in range(total_duration): + total_memory.append(0) + total_threads.append(0) + + now = start + for i in range(total_duration): + start_index = 0 + node_start = None + node_finish = None + + x = now + + for j in range(start_index, len(nodes)): + node_start = nodes[j]['start'] + node_finish = nodes[j]['finish'] + + if node_start < x and node_finish > x: + total_memory[i] += float(nodes[j]['estimated_memory_gb']) + total_threads[i] += int(nodes[j]['num_threads']) + start_index = j + + if node_start > x: + break + + now += datetime.timedelta(seconds=1) + + return total_memory, total_threads + + +def test_do_not_use_more_memory_then_specified(): + LOG_FILENAME = 'callback.log' + my_logger = logging.getLogger('callback') + my_logger.setLevel(logging.DEBUG) + + # Add the log message handler to the logger + handler = logging.FileHandler(LOG_FILENAME) + my_logger.addHandler(handler) + + max_memory = 1 + pipe = pe.Workflow(name='pipe') + n1 = pe.Node(interface=TestInterfaceSingleNode(), name='n1') + n2 = pe.Node(interface=TestInterfaceSingleNode(), name='n2') + n3 = pe.Node(interface=TestInterfaceSingleNode(), name='n3') + n4 = pe.Node(interface=TestInterfaceSingleNode(), name='n4') + + n1.interface.estimated_memory_gb = 1 + n2.interface.estimated_memory_gb = 1 + n3.interface.estimated_memory_gb = 1 + n4.interface.estimated_memory_gb = 1 + + pipe.connect(n1, 'output1', n2, 'input1') + pipe.connect(n1, 'output1', n3, 'input1') + pipe.connect(n2, 'output1', n4, 'input1') + pipe.connect(n3, 'output1', n4, 'input2') + n1.inputs.input1 = 1 + + pipe.run(plugin='MultiProc', + plugin_args={'memory': max_memory, + 'status_callback': log_nodes_cb}) + + + nodes = draw_gantt_chart.log_to_dict(LOG_FILENAME) + last_node = nodes[-1] + #usage in every second + memory, threads = find_metrics(nodes, last_node) + + result = True + for m in memory: + if m > max_memory: + result = False + break + + yield assert_equal, result, True + + max_threads = cpu_count() + + result = True + for t in threads: + if t > max_threads: + result = False + break + + yield assert_equal, result, True,\ + "using more threads than system has (threads is not specified by user)" + + os.remove(LOG_FILENAME) + + +def test_do_not_use_more_threads_then_specified(): + LOG_FILENAME = 'callback.log' + my_logger = logging.getLogger('callback') + my_logger.setLevel(logging.DEBUG) + + # Add the log message handler to the logger + handler = logging.FileHandler(LOG_FILENAME) + my_logger.addHandler(handler) + + max_threads = 4 + pipe = pe.Workflow(name='pipe') + n1 = pe.Node(interface=TestInterfaceSingleNode(), name='n1') + n2 = pe.Node(interface=TestInterfaceSingleNode(), name='n2') + n3 = pe.Node(interface=TestInterfaceSingleNode(), name='n3') + n4 = pe.Node(interface=TestInterfaceSingleNode(), name='n4') + + n1.interface.num_threads = 1 + n2.interface.num_threads = 1 + n3.interface.num_threads = 4 + n4.interface.num_threads = 1 + + pipe.connect(n1, 'output1', n2, 'input1') + pipe.connect(n1, 'output1', n3, 'input1') + pipe.connect(n2, 'output1', n4, 'input1') + pipe.connect(n3, 'output1', n4, 'input2') + n1.inputs.input1 = 4 + pipe.config['execution']['poll_sleep_duration'] = 1 + pipe.run(plugin='MultiProc', plugin_args={'n_procs': max_threads, + 'status_callback': log_nodes_cb}) + + nodes = draw_gantt_chart.log_to_dict(LOG_FILENAME) + last_node = nodes[-1] + #usage in every second + memory, threads = find_metrics(nodes, last_node) + + result = True + for t in threads: + if t > max_threads: + result = False + break + + yield assert_equal, result, True, "using more threads than specified" + + max_memory = get_system_total_memory_gb() + result = True + for m in memory: + if m > max_memory: + result = False + break + yield assert_equal, result, True,\ + "using more memory than system has (memory is not specified by user)" + + os.remove(LOG_FILENAME) diff --git a/nipype/pipeline/plugins/tests/test_multiproc_nondaemon.py b/nipype/pipeline/plugins/tests/test_multiproc_nondaemon.py index 89336c2026..cdba9da5b5 100644 --- a/nipype/pipeline/plugins/tests/test_multiproc_nondaemon.py +++ b/nipype/pipeline/plugins/tests/test_multiproc_nondaemon.py @@ -1,3 +1,9 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +"""Testing module for functions and classes from multiproc.py +""" + +# Import packages from builtins import range import os from tempfile import mkdtemp @@ -15,9 +21,9 @@ def mytestFunction(insum=0): # need to import here since this is executed as an external process import multiprocessing + import os import tempfile import time - import os numberOfThreads = 2 @@ -74,17 +80,18 @@ def dummyFunction(filename): # read in all temp files and sum them up total = insum - for file in f: - with open(file) as fd: + for ff in f: + with open(ff) as fd: total += int(fd.read()) - os.remove(file) + os.remove(ff) return total def run_multiproc_nondaemon_with_flag(nondaemon_flag): ''' - Start a pipe with two nodes using the multiproc plugin and passing the nondaemon_flag. + Start a pipe with two nodes using the resource multiproc plugin and + passing the nondaemon_flag. ''' cur_dir = os.getcwd() @@ -107,7 +114,6 @@ def run_multiproc_nondaemon_with_flag(nondaemon_flag): f1.inputs.insum = 0 pipe.config['execution']['stop_on_first_crash'] = True - pipe.config['execution']['poll_sleep_duration'] = 2 # execute the pipe using the MultiProc plugin with 2 processes and the non_daemon flag # to enable child processes which start other multiprocessing jobs diff --git a/nipype/utils/draw_gantt_chart.py b/nipype/utils/draw_gantt_chart.py new file mode 100644 index 0000000000..d4fdc88830 --- /dev/null +++ b/nipype/utils/draw_gantt_chart.py @@ -0,0 +1,576 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +"""Module to draw an html gantt chart from logfile produced by +callback_log.log_nodes_cb() +""" + +# Import packages +import json +from dateutil import parser +import datetime +import random +from collections import OrderedDict +# Pandas +try: + import pandas as pd +except ImportError: + print('Pandas not found; in order for full functionality of this module '\ + 'install the pandas package') + pass + +def create_event_dict(start_time, nodes_list): + ''' + Function to generate a dictionary of event (start/finish) nodes + from the nodes list + + Parameters + ---------- + start_time : datetime.datetime + a datetime object of the pipeline start time + nodes_list : list + a list of the node dictionaries that were run in the pipeline + + Returns + ------- + events : dictionary + a dictionary where the key is the timedelta from the start of + the pipeline execution to the value node it accompanies + ''' + + # Import packages + import copy + + events = {} + for node in nodes_list: + # Format node fields + estimated_threads = node.get('num_threads', 1) + estimated_memory_gb = node.get('estimated_memory_gb', 1.0) + runtime_threads = node.get('runtime_threads', 0) + runtime_memory_gb = node.get('runtime_memory_gb', 0.0) + + # Init and format event-based nodes + node['estimated_threads'] = estimated_threads + node['estimated_memory_gb'] = estimated_memory_gb + node['runtime_threads'] = runtime_threads + node['runtime_memory_gb'] = runtime_memory_gb + start_node = node + finish_node = copy.deepcopy(node) + start_node['event'] = 'start' + finish_node['event'] = 'finish' + + # Get dictionary key + start_delta = (node['start'] - start_time).total_seconds() + finish_delta = (node['finish'] - start_time).total_seconds() + + # Populate dictionary + if events.has_key(start_delta) or events.has_key(finish_delta): + err_msg = 'Event logged twice or events started at exact same time!' + raise KeyError(err_msg) + events[start_delta] = start_node + events[finish_delta] = finish_node + + # Return events dictionary + return events + + +def log_to_dict(logfile): + ''' + Function to extract log node dictionaries into a list of python + dictionaries and return the list as well as the final node + + Parameters + ---------- + logfile : string + path to the json-formatted log file generated from a nipype + workflow execution + + Returns + ------- + nodes_list : list + a list of python dictionaries containing the runtime info + for each nipype node + ''' + + # Init variables + #keep track of important vars + nodes_list = [] #all the parsed nodes + unifinished_nodes = [] #all start nodes that dont have a finish yet + + with open(logfile, 'r') as content: + #read file separating each line + content = content.read() + lines = content.split('\n') + + for l in lines: + #try to parse each line and transform in a json dict. + #if the line has a bad format, just skip + node = None + try: + node = json.loads(l) + except ValueError: + pass + + if not node: + continue + + #if it is a start node, add to unifinished nodes + if 'start' in node: + node['start'] = parser.parse(node['start']) + unifinished_nodes.append(node) + + #if it is end node, look in uninished nodes for matching start + #remove from unifinished list and add to node list + elif 'finish' in node: + node['finish'] = parser.parse(node['finish']) + #because most nodes are small, we look backwards in the unfinished list + for s in range(len(unifinished_nodes)): + aux = unifinished_nodes[s] + #found the end for node start, copy over info + if aux['id'] == node['id'] and aux['name'] == node['name'] \ + and aux['start'] < node['finish']: + node['start'] = aux['start'] + node['duration'] = \ + (node['finish'] - node['start']).total_seconds() + + unifinished_nodes.remove(aux) + nodes_list.append(node) + break + + #finished parsing + #assume nodes without finish didn't finish running. + #set their finish to last node run + last_node = nodes_list[-1] + for n in unifinished_nodes: + n['finish'] = last_node['finish'] + n['duration'] = (n['finish'] - n['start']).total_seconds() + nodes_list.append(n) + + # Return list of nodes + return nodes_list + + +def calculate_resource_timeseries(events, resource): + ''' + Given as event dictionary, calculate the resources used + as a timeseries + + Parameters + ---------- + events : dictionary + a dictionary of event-based node dictionaries of the workflow + execution statistics + resource : string + the resource of interest to return the time-series of; + e.g. 'runtime_memory_gb', 'estimated_threads', etc + + Returns + ------- + time_series : pandas Series + a pandas Series object that contains timestamps as the indices + and the resource amount as values + ''' + + # Import packages + import pandas as pd + + # Init variables + res = OrderedDict() + all_res = 0.0 + + # Iterate through the events + for tdelta, event in sorted(events.items()): + if event['event'] == "start": + if resource in event and event[resource] != 'Unknown': + all_res += float(event[resource]) + current_time = event['start']; + elif event['event'] == "finish": + if resource in event and event[resource] != 'Unknown': + all_res -= float(event[resource]) + current_time = event['finish']; + res[current_time] = all_res + + # Formulate the pandas timeseries + time_series = pd.Series(data=res.values(), index=res.keys()) + # Downsample where there is only value-diff + ts_diff = time_series.diff() + time_series = time_series[ts_diff!=0] + + # Return the new time series + return time_series + + +def draw_lines(start, total_duration, minute_scale, scale): + ''' + Function to draw the minute line markers and timestamps + + Parameters + ---------- + start : datetime.datetime obj + start time for first minute line marker + total_duration : float + total duration of the workflow execution (in seconds) + minute_scale : integer + the scale, in minutes, at which to plot line markers for the + gantt chart; for example, minute_scale=10 means there are lines + drawn at every 10 minute interval from start to finish + scale : integer + scale factor in pixel spacing between minute line markers + + Returns + ------- + result : string + the html-formatted string for producing the minutes-based + time line markers + ''' + + # Init variables + result = '' + next_line = 220 + next_time = start + num_lines = int((total_duration/60) / minute_scale) + 2 + + # Iterate through the lines and create html line markers string + for line in range(num_lines): + # Line object + new_line = "
" % next_line + result += new_line + # Time digits + time = "

%02d:%02d

" % \ + (next_line-20, next_time.hour, next_time.minute) + result += time + # Increment line spacing and digits + next_line += minute_scale * scale + next_time += datetime.timedelta(minutes=minute_scale) + + # Return html string for time line markers + return result + + +def draw_nodes(start, nodes_list, cores, minute_scale, space_between_minutes, + colors): + ''' + Function to return the html-string of the node drawings for the + gantt chart + + Parameters + ---------- + start : datetime.datetime obj + start time for first node + nodes_list : list + a list of the node dictionaries + cores : integer + the number of cores given to the workflow via the 'n_procs' + plugin arg + total_duration : float + total duration of the workflow execution (in seconds) + minute_scale : integer + the scale, in minutes, at which to plot line markers for the + gantt chart; for example, minute_scale=10 means there are lines + drawn at every 10 minute interval from start to finish + space_between_minutes : integer + scale factor in pixel spacing between minute line markers + colors : list + a list of colors to choose from when coloring the nodes in the + gantt chart + + Returns + ------- + result : string + the html-formatted string for producing the minutes-based + time line markers + ''' + + # Init variables + result = '' + scale = float(space_between_minutes/float(minute_scale)) + space_between_minutes = float(space_between_minutes/scale) + end_times = [datetime.datetime(start.year, start.month, start.day, + start.hour, start.minute, start.second) \ + for core in range(cores)] + + # For each node in the pipeline + for node in nodes_list: + # Get start and finish times + node_start = node['start'] + node_finish = node['finish'] + # Calculate an offset and scale duration + offset = ((node_start - start).total_seconds() / 60) * scale * \ + space_between_minutes + 220 + # Scale duration + scale_duration = (node['duration'] / 60) * scale * space_between_minutes + if scale_duration < 5: + scale_duration = 5 + scale_duration -= 2 + # Left + left = 60 + for core in range(len(end_times)): + if end_times[core] < node_start: + left += core * 30 + end_times[core] = datetime.datetime(node_finish.year, + node_finish.month, + node_finish.day, + node_finish.hour, + node_finish.minute, + node_finish.second) + break + + # Get color for node object + color = random.choice(colors) + if 'error' in node: + color = 'red' + + # Setup dictionary for node html string insertion + node_dict = {'left' : left, + 'offset' : offset, + 'scale_duration' : scale_duration, + 'color' : color, + 'node_name' : node['name'], + 'node_dur' : node['duration']/60.0, + 'node_start' : node_start.strftime("%Y-%m-%d %H:%M:%S"), + 'node_finish' : node_finish.strftime("%Y-%m-%d %H:%M:%S")} + # Create new node string + new_node = "
" % \ + node_dict + + # Append to output result + result += new_node + + # Return html string for nodes + return result + +def draw_resource_bar(start_time, finish_time, time_series, space_between_minutes, + minute_scale, color, left, resource): + ''' + ''' + + # Memory header + result = "

%s

" \ + % (left, resource) + # Image scaling factors + scale = float(space_between_minutes/float(minute_scale)) + space_between_minutes = float(space_between_minutes/scale) + + # Iterate through time series + ts_len = len(time_series) + for idx, (ts_start, amount) in enumerate(time_series.iteritems()): + if idx < ts_len-1: + ts_end = time_series.index[idx+1] + else: + ts_end = finish_time + # Calculate offset from start at top + offset = ((ts_start-start_time).total_seconds() / 60.0) * scale * \ + space_between_minutes + 220 + # Scale duration + duration_mins = (ts_end-ts_start).total_seconds() / 60.0 + height = duration_mins * scale * \ + space_between_minutes + if height < 5: + height = 5 + height -= 2 + + # Bar width is proportional to resource amount + width = amount * 20 + + if resource.lower() == 'memory': + label = '%.3f GB' % amount + else: + label = '%d threads' % amount + + # Setup dictionary for bar html string insertion + bar_dict = {'color' : color, + 'height' : height, + 'width' : width, + 'offset': offset, + 'left' : left, + 'label' : label, + 'duration' : duration_mins, + 'start' : ts_start.strftime('%Y-%m-%d %H:%M:%S'), + 'finish' : ts_end.strftime('%Y-%m-%d %H:%M:%S')} + + bar_html = "
" + # Add another bar to html line + result += bar_html % bar_dict + + # Return bar-formatted html string + return result + + +def generate_gantt_chart(logfile, cores, minute_scale=10, + space_between_minutes=50, + colors=["#7070FF", "#4E4EB2", "#2D2D66", "#9B9BFF"]): + ''' + Generates a gantt chart in html showing the workflow execution based on a callback log file. + This script was intended to be used with the MultiprocPlugin. + The following code shows how to set up the workflow in order to generate the log file: + + Parameters + ---------- + logfile : string + filepath to the callback log file to plot the gantt chart of + cores : integer + the number of cores given to the workflow via the 'n_procs' + plugin arg + minute_scale : integer (optional); default=10 + the scale, in minutes, at which to plot line markers for the + gantt chart; for example, minute_scale=10 means there are lines + drawn at every 10 minute interval from start to finish + space_between_minutes : integer (optional); default=50 + scale factor in pixel spacing between minute line markers + colors : list (optional) + a list of colors to choose from when coloring the nodes in the + gantt chart + + + Returns + ------- + None + the function does not return any value but writes out an html + file in the same directory as the callback log path passed in + + Usage + ----- + # import logging + # import logging.handlers + # from nipype.pipeline.plugins.callback_log import log_nodes_cb + + # log_filename = 'callback.log' + # logger = logging.getLogger('callback') + # logger.setLevel(logging.DEBUG) + # handler = logging.FileHandler(log_filename) + # logger.addHandler(handler) + + # #create workflow + # workflow = ... + + # workflow.run(plugin='MultiProc', + # plugin_args={'n_procs':8, 'memory':12, 'status_callback': log_nodes_cb}) + + # generate_gantt_chart('callback.log', 8) + ''' + + #add the html header + html_string = ''' + + + + + +
+
+ ''' + + close_header = ''' +
+
+

Estimated Resource

+

Actual Resource

+

Failed Node

+
+ ''' + + # Read in json-log to get list of node dicts + nodes_list = log_to_dict(logfile) + + # Create the header of the report with useful information + start_node = nodes_list[0] + last_node = nodes_list[-1] + duration = (last_node['finish'] - start_node['start']).total_seconds() + + # Get events based dictionary of node run stats + events = create_event_dict(start_node['start'], nodes_list) + + # Summary strings of workflow at top + html_string += '

Start: ' + start_node['start'].strftime("%Y-%m-%d %H:%M:%S") + '

' + html_string += '

Finish: ' + last_node['finish'].strftime("%Y-%m-%d %H:%M:%S") + '

' + html_string += '

Duration: ' + "{0:.2f}".format(duration/60) + ' minutes

' + html_string += '

Nodes: ' + str(len(nodes_list))+'

' + html_string += '

Cores: ' + str(cores) + '

' + html_string += close_header + # Draw nipype nodes Gantt chart and runtimes + html_string += draw_lines(start_node['start'], duration, minute_scale, + space_between_minutes) + html_string += draw_nodes(start_node['start'], nodes_list, cores, minute_scale, + space_between_minutes, colors) + + # Get memory timeseries + estimated_mem_ts = calculate_resource_timeseries(events, 'estimated_memory_gb') + runtime_mem_ts = calculate_resource_timeseries(events, 'runtime_memory_gb') + # Plot gantt chart + resource_offset = 120 + 30*cores + html_string += draw_resource_bar(start_node['start'], last_node['finish'], estimated_mem_ts, + space_between_minutes, minute_scale, '#90BBD7', resource_offset*2+120, 'Memory') + html_string += draw_resource_bar(start_node['start'], last_node['finish'], runtime_mem_ts, + space_between_minutes, minute_scale, '#03969D', resource_offset*2+120, 'Memory') + + # Get threads timeseries + estimated_threads_ts = calculate_resource_timeseries(events, 'estimated_threads') + runtime_threads_ts = calculate_resource_timeseries(events, 'runtime_threads') + # Plot gantt chart + html_string += draw_resource_bar(start_node['start'], last_node['finish'], estimated_threads_ts, + space_between_minutes, minute_scale, '#90BBD7', resource_offset, 'Threads') + html_string += draw_resource_bar(start_node['start'], last_node['finish'], runtime_threads_ts, + space_between_minutes, minute_scale, '#03969D', resource_offset, 'Threads') + + #finish html + html_string+= ''' +
+ ''' + + #save file + html_file = open(logfile + '.html', 'wb') + html_file.write(html_string) + html_file.close()