Skip to content

Commit cfcc2bc

Browse files
authored
Merge branch 'master' into enh/logging-multiproc
2 parents b366e91 + 331e939 commit cfcc2bc

File tree

2 files changed

+28
-15
lines changed

2 files changed

+28
-15
lines changed

nipype/interfaces/base.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1343,11 +1343,11 @@ def _canonicalize_env(env):
13431343
return env
13441344

13451345
out_env = {}
1346-
for key, val in env:
1346+
for key, val in env.items():
13471347
if not isinstance(key, bytes):
13481348
key = key.encode('utf-8')
13491349
if not isinstance(val, bytes):
1350-
val = key.encode('utf-8')
1350+
val = val.encode('utf-8')
13511351
out_env[key] = val
13521352
return out_env
13531353

@@ -1485,7 +1485,7 @@ def get_dependencies(name, environ):
14851485
if sys.platform == 'darwin':
14861486
cmd = 'otool -L `which {}`'.format
14871487
elif 'linux' in sys.platform:
1488-
cmd = 'ldd -L `which {}`'.format
1488+
cmd = 'ldd `which {}`'.format
14891489

14901490
if cmd is None:
14911491
return 'Platform %s not supported' % sys.platform

nipype/pipeline/plugins/multiproc.py

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from traceback import format_exception
1414
import sys
1515
from logging import INFO
16+
import gc
1617

1718
from copy import deepcopy
1819
import numpy as np
@@ -131,13 +132,16 @@ def __init__(self, plugin_args=None):
131132
non_daemon = self.plugin_args.get('non_daemon', True)
132133
maxtasks = self.plugin_args.get('maxtasksperchild', 10)
133134
self.processors = self.plugin_args.get('n_procs', cpu_count())
134-
self.memory_gb = self.plugin_args.get('memory_gb', # Allocate 90% of system memory
135-
get_system_total_memory_gb() * 0.9)
136-
self.raise_insufficient = self.plugin_args.get('raise_insufficient', True)
135+
self.memory_gb = self.plugin_args.get(
136+
'memory_gb', # Allocate 90% of system memory
137+
get_system_total_memory_gb() * 0.9)
138+
self.raise_insufficient = self.plugin_args.get('raise_insufficient',
139+
True)
137140

138141
# Instantiate different thread pools for non-daemon processes
139-
logger.debug('[MultiProc] Starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)',
140-
'non' * int(non_daemon), self.processors, self.memory_gb)
142+
logger.debug('[MultiProc] Starting in "%sdaemon" mode (n_procs=%d, '
143+
'mem_gb=%0.2f)', 'non' * int(non_daemon), self.processors,
144+
self.memory_gb)
141145

142146
NipypePool = NonDaemonPool if non_daemon else Pool
143147
try:
@@ -214,12 +218,13 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
214218
Sends jobs to workers when system resources are available.
215219
"""
216220

217-
# Check to see if a job is available (jobs without dependencies not run)
221+
# Check to see if a job is available (jobs with all dependencies run)
218222
# See https://github.com/nipy/nipype/pull/2200#discussion_r141605722
219223
jobids = np.nonzero(~self.proc_done & (self.depidx.sum(0) == 0))[1]
220224

221-
# Check available system resources by summing all threads and memory used
222-
free_memory_gb, free_processors = self._check_resources(self.pending_tasks)
225+
# Check available resources by summing all threads and memory used
226+
free_memory_gb, free_processors = self._check_resources(
227+
self.pending_tasks)
223228

224229
stats = (len(self.pending_tasks), len(jobids), free_memory_gb,
225230
self.memory_gb, free_processors, self.processors)
@@ -248,7 +253,11 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
248253
'be submitted to the queue. Potential deadlock')
249254
return
250255

251-
jobids = self._sort_jobs(jobids, scheduler=self.plugin_args.get('scheduler'))
256+
jobids = self._sort_jobs(jobids,
257+
scheduler=self.plugin_args.get('scheduler'))
258+
259+
# Run garbage collector before potentially submitting jobs
260+
gc.collect()
252261

253262
# Submit jobs
254263
for jobid in jobids:
@@ -281,9 +290,10 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
281290

282291
free_memory_gb -= next_job_gb
283292
free_processors -= next_job_th
284-
logger.debug('Allocating %s ID=%d (%0.2fGB, %d threads). Free: %0.2fGB, %d threads.',
285-
self.procs[jobid].fullname, jobid, next_job_gb, next_job_th,
286-
free_memory_gb, free_processors)
293+
logger.debug('Allocating %s ID=%d (%0.2fGB, %d threads). Free: '
294+
'%0.2fGB, %d threads.', self.procs[jobid].fullname,
295+
jobid, next_job_gb, next_job_th, free_memory_gb,
296+
free_processors)
287297

288298
# change job status in appropriate queues
289299
self.proc_done[jobid] = True
@@ -312,6 +322,9 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
312322
free_processors += next_job_th
313323
# Display stats next loop
314324
self._stats = None
325+
326+
# Clean up any debris from running node in main process
327+
gc.collect()
315328
continue
316329

317330
# Task should be submitted to workers

0 commit comments

Comments
 (0)