From 5573e5441106f21601bd5b6545390e0b15d78019 Mon Sep 17 00:00:00 2001 From: "Christopher J. Markiewicz" Date: Wed, 29 Nov 2017 14:22:25 -0500 Subject: [PATCH 1/2] ENH: Explicit garbage collection in MultiProc --- nipype/pipeline/plugins/multiproc.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index b26d029518..660267c78a 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -12,6 +12,7 @@ from multiprocessing import Process, Pool, cpu_count, pool from traceback import format_exception import sys +import gc from copy import deepcopy import numpy as np @@ -230,6 +231,9 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): jobids = self._sort_jobs(jobids, scheduler=self.plugin_args.get('scheduler')) + # Run garbage collector before potentially submitting jobs + gc.collect() + # Submit jobs for jobid in jobids: # First expand mapnodes @@ -292,6 +296,9 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): free_processors += next_job_th # Display stats next loop self._stats = None + + # Clean up any debris from running node in main process + gc.collect() continue # Task should be submitted to workers From 0edf2defbec1d8092de0acd83740fb4091ed564c Mon Sep 17 00:00:00 2001 From: "Christopher J. Markiewicz" Date: Wed, 29 Nov 2017 14:23:51 -0500 Subject: [PATCH 2/2] STY: Flake8 cleanup --- nipype/pipeline/plugins/multiproc.py | 30 +++++++++++++++++----------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 660267c78a..f649b99fd7 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -122,13 +122,16 @@ def __init__(self, plugin_args=None): non_daemon = self.plugin_args.get('non_daemon', True) maxtasks = self.plugin_args.get('maxtasksperchild', 10) self.processors = self.plugin_args.get('n_procs', cpu_count()) - self.memory_gb = self.plugin_args.get('memory_gb', # Allocate 90% of system memory - get_system_total_memory_gb() * 0.9) - self.raise_insufficient = self.plugin_args.get('raise_insufficient', True) + self.memory_gb = self.plugin_args.get( + 'memory_gb', # Allocate 90% of system memory + get_system_total_memory_gb() * 0.9) + self.raise_insufficient = self.plugin_args.get('raise_insufficient', + True) # Instantiate different thread pools for non-daemon processes - logger.debug('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)', - 'non' * int(non_daemon), self.processors, self.memory_gb) + logger.debug('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d,' + 'mem_gb=%0.2f)', 'non' * int(non_daemon), self.processors, + self.memory_gb) NipypePool = NonDaemonPool if non_daemon else Pool try: @@ -205,12 +208,13 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): Sends jobs to workers when system resources are available. """ - # Check to see if a job is available (jobs without dependencies not run) + # Check to see if a job is available (jobs with all dependencies run) # See https://github.com/nipy/nipype/pull/2200#discussion_r141605722 jobids = np.nonzero(~self.proc_done & (self.depidx.sum(0) == 0))[1] - # Check available system resources by summing all threads and memory used - free_memory_gb, free_processors = self._check_resources(self.pending_tasks) + # Check available resources by summing all threads and memory used + free_memory_gb, free_processors = self._check_resources( + self.pending_tasks) stats = (len(self.pending_tasks), len(jobids), free_memory_gb, self.memory_gb, free_processors, self.processors) @@ -229,7 +233,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): 'be submitted to the queue. Potential deadlock') return - jobids = self._sort_jobs(jobids, scheduler=self.plugin_args.get('scheduler')) + jobids = self._sort_jobs(jobids, + scheduler=self.plugin_args.get('scheduler')) # Run garbage collector before potentially submitting jobs gc.collect() @@ -265,9 +270,10 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): free_memory_gb -= next_job_gb free_processors -= next_job_th - logger.debug('Allocating %s ID=%d (%0.2fGB, %d threads). Free: %0.2fGB, %d threads.', - self.procs[jobid].fullname, jobid, next_job_gb, next_job_th, - free_memory_gb, free_processors) + logger.debug('Allocating %s ID=%d (%0.2fGB, %d threads). Free: ' + '%0.2fGB, %d threads.', self.procs[jobid].fullname, + jobid, next_job_gb, next_job_th, free_memory_gb, + free_processors) # change job status in appropriate queues self.proc_done[jobid] = True