Skip to content

Commit 331e939

Browse files
authored
Merge pull request #2315 from effigies/garbage_collect
ENH: Explicit garbage collection in MultiProc
2 parents aee2e6e + 0edf2de commit 331e939

File tree

1 file changed

+25
-12
lines changed

1 file changed

+25
-12
lines changed

nipype/pipeline/plugins/multiproc.py

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from multiprocessing import Process, Pool, cpu_count, pool
1313
from traceback import format_exception
1414
import sys
15+
import gc
1516

1617
from copy import deepcopy
1718
import numpy as np
@@ -121,13 +122,16 @@ def __init__(self, plugin_args=None):
121122
non_daemon = self.plugin_args.get('non_daemon', True)
122123
maxtasks = self.plugin_args.get('maxtasksperchild', 10)
123124
self.processors = self.plugin_args.get('n_procs', cpu_count())
124-
self.memory_gb = self.plugin_args.get('memory_gb', # Allocate 90% of system memory
125-
get_system_total_memory_gb() * 0.9)
126-
self.raise_insufficient = self.plugin_args.get('raise_insufficient', True)
125+
self.memory_gb = self.plugin_args.get(
126+
'memory_gb', # Allocate 90% of system memory
127+
get_system_total_memory_gb() * 0.9)
128+
self.raise_insufficient = self.plugin_args.get('raise_insufficient',
129+
True)
127130

128131
# Instantiate different thread pools for non-daemon processes
129-
logger.debug('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)',
130-
'non' * int(non_daemon), self.processors, self.memory_gb)
132+
logger.debug('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d,'
133+
'mem_gb=%0.2f)', 'non' * int(non_daemon), self.processors,
134+
self.memory_gb)
131135

132136
NipypePool = NonDaemonPool if non_daemon else Pool
133137
try:
@@ -204,12 +208,13 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
204208
Sends jobs to workers when system resources are available.
205209
"""
206210

207-
# Check to see if a job is available (jobs without dependencies not run)
211+
# Check to see if a job is available (jobs with all dependencies run)
208212
# See https://github.com/nipy/nipype/pull/2200#discussion_r141605722
209213
jobids = np.nonzero(~self.proc_done & (self.depidx.sum(0) == 0))[1]
210214

211-
# Check available system resources by summing all threads and memory used
212-
free_memory_gb, free_processors = self._check_resources(self.pending_tasks)
215+
# Check available resources by summing all threads and memory used
216+
free_memory_gb, free_processors = self._check_resources(
217+
self.pending_tasks)
213218

214219
stats = (len(self.pending_tasks), len(jobids), free_memory_gb,
215220
self.memory_gb, free_processors, self.processors)
@@ -228,7 +233,11 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
228233
'be submitted to the queue. Potential deadlock')
229234
return
230235

231-
jobids = self._sort_jobs(jobids, scheduler=self.plugin_args.get('scheduler'))
236+
jobids = self._sort_jobs(jobids,
237+
scheduler=self.plugin_args.get('scheduler'))
238+
239+
# Run garbage collector before potentially submitting jobs
240+
gc.collect()
232241

233242
# Submit jobs
234243
for jobid in jobids:
@@ -261,9 +270,10 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
261270

262271
free_memory_gb -= next_job_gb
263272
free_processors -= next_job_th
264-
logger.debug('Allocating %s ID=%d (%0.2fGB, %d threads). Free: %0.2fGB, %d threads.',
265-
self.procs[jobid].fullname, jobid, next_job_gb, next_job_th,
266-
free_memory_gb, free_processors)
273+
logger.debug('Allocating %s ID=%d (%0.2fGB, %d threads). Free: '
274+
'%0.2fGB, %d threads.', self.procs[jobid].fullname,
275+
jobid, next_job_gb, next_job_th, free_memory_gb,
276+
free_processors)
267277

268278
# change job status in appropriate queues
269279
self.proc_done[jobid] = True
@@ -292,6 +302,9 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
292302
free_processors += next_job_th
293303
# Display stats next loop
294304
self._stats = None
305+
306+
# Clean up any debris from running node in main process
307+
gc.collect()
295308
continue
296309

297310
# Task should be submitted to workers

0 commit comments

Comments
 (0)