diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index f649b99fd7..86c021decd 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -12,16 +12,26 @@ from multiprocessing import Process, Pool, cpu_count, pool from traceback import format_exception import sys +from logging import INFO import gc from copy import deepcopy import numpy as np - from ... import logging from ...utils.profiler import get_system_total_memory_gb from ..engine import MapNode from .base import DistributedPluginBase +try: + from textwrap import indent +except ImportError: + def indent(text, prefix): + """ A textwrap.indent replacement for Python < 3.3 """ + if not prefix: + return text + splittext = text.splitlines(True) + return prefix + prefix.join(splittext) + # Init logger logger = logging.getLogger('workflow') @@ -129,7 +139,7 @@ def __init__(self, plugin_args=None): True) # Instantiate different thread pools for non-daemon processes - logger.debug('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d,' + logger.debug('[MultiProc] Starting in "%sdaemon" mode (n_procs=%d, ' 'mem_gb=%0.2f)', 'non' * int(non_daemon), self.processors, self.memory_gb) @@ -162,7 +172,7 @@ def _submit_job(self, node, updatehash=False): run_node, (node, updatehash, self._taskid), callback=self._async_callback) - logger.debug('MultiProc submitted task %s (taskid=%d).', + logger.debug('[MultiProc] Submitted task %s (taskid=%d).', node.fullname, self._taskid) return self._taskid @@ -219,9 +229,19 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): stats = (len(self.pending_tasks), len(jobids), free_memory_gb, self.memory_gb, free_processors, self.processors) if self._stats != stats: - logger.info('Currently running %d tasks, and %d jobs ready. Free ' - 'memory (GB): %0.2f/%0.2f, Free processors: %d/%d', - *stats) + tasks_list_msg = '' + + if logger.level <= INFO: + running_tasks = [' * %s' % self.procs[jobid].fullname + for _, jobid in self.pending_tasks] + if running_tasks: + tasks_list_msg = '\nCurrently running:\n' + tasks_list_msg += '\n'.join(running_tasks) + tasks_list_msg = indent(tasks_list_msg, ' ' * 21) + logger.info('[MultiProc] Running %d tasks, and %d jobs ready. Free ' + 'memory (GB): %0.2f/%0.2f, Free processors: %d/%d.%s', + len(self.pending_tasks), len(jobids), free_memory_gb, self.memory_gb, + free_processors, self.processors, tasks_list_msg) self._stats = stats if free_memory_gb < 0.01 or free_processors == 0: