Skip to content

Commit b0828e0

Browse files
committed
[ENH] Revising use of subprocess.Popen
Make sure everything is tidied up after using Popen.
1 parent c4a249a commit b0828e0

File tree

3 files changed

+55
-57
lines changed

3 files changed

+55
-57
lines changed

nipype/interfaces/base.py

Lines changed: 46 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,10 @@
99
Requires Packages to be installed
1010
"""
1111
from __future__ import print_function, division, unicode_literals, absolute_import
12-
from future import standard_library
13-
standard_library.install_aliases()
12+
import gc
13+
1414
from builtins import range, object, open, str, bytes
1515

16-
from configparser import NoOptionError
1716
from copy import deepcopy
1817
import datetime
1918
from datetime import datetime as dt
@@ -26,7 +25,6 @@
2625
import select
2726
import subprocess as sp
2827
import sys
29-
import time
3028
from textwrap import wrap
3129
from warnings import warn
3230
import simplejson as json
@@ -43,6 +41,8 @@
4341
traits, Undefined, TraitDictObject, TraitListObject, TraitError, isdefined,
4442
File, Directory, DictStrStr, has_metadata, ImageFile)
4543
from ..external.due import due
44+
from future import standard_library
45+
standard_library.install_aliases()
4646

4747
nipype_version = Version(__version__)
4848
iflogger = logging.getLogger('interface')
@@ -58,6 +58,7 @@
5858
class Str(traits.Unicode):
5959
"""Replacement for the default traits.Str based in bytes"""
6060

61+
6162
traits.Str = Str
6263

6364

@@ -634,16 +635,16 @@ def __deepcopy__(self, memo):
634635
return memo[id_self]
635636
dup_dict = deepcopy(self.get(), memo)
636637
# access all keys
637-
for key in self.copyable_trait_names():
638-
if key in self.__dict__.keys():
639-
_ = getattr(self, key)
638+
# for key in self.copyable_trait_names():
639+
# if key in self.__dict__.keys():
640+
# _ = getattr(self, key)
640641
# clone once
641642
dup = self.clone_traits(memo=memo)
642-
for key in self.copyable_trait_names():
643-
try:
644-
_ = getattr(dup, key)
645-
except:
646-
pass
643+
# for key in self.copyable_trait_names():
644+
# try:
645+
# _ = getattr(dup, key)
646+
# except:
647+
# pass
647648
# clone twice
648649
dup = self.clone_traits(memo=memo)
649650
dup.trait_set(**dup_dict)
@@ -1260,6 +1261,7 @@ class SimpleInterface(BaseInterface):
12601261
>>> os.chdir(old.strpath)
12611262
12621263
"""
1264+
12631265
def __init__(self, from_file=None, resource_monitor=None, **inputs):
12641266
super(SimpleInterface, self).__init__(
12651267
from_file=from_file, resource_monitor=resource_monitor, **inputs)
@@ -1387,8 +1389,7 @@ def run_command(runtime, output=None, timeout=0.01):
13871389
shell=True,
13881390
cwd=runtime.cwd,
13891391
env=env,
1390-
close_fds=True,
1391-
)
1392+
close_fds=True)
13921393
result = {
13931394
'stdout': [],
13941395
'stderr': [],
@@ -1427,12 +1428,7 @@ def _process(drain=0):
14271428
temp.sort()
14281429
result['merged'] = [r[1] for r in temp]
14291430

1430-
if output == 'allatonce':
1431-
stdout, stderr = proc.communicate()
1432-
result['stdout'] = read_stream(stdout, logger=iflogger)
1433-
result['stderr'] = read_stream(stderr, logger=iflogger)
1434-
1435-
elif output.startswith('file'):
1431+
if output.startswith('file'):
14361432
proc.wait()
14371433
if outfile is not None:
14381434
stdout.flush()
@@ -1452,12 +1448,18 @@ def _process(drain=0):
14521448
result['merged'] = result['stdout']
14531449
result['stdout'] = []
14541450
else:
1455-
proc.communicate() # Discard stdout and stderr
1451+
stdout, stderr = proc.communicate()
1452+
if output == 'allatonce': # Discard stdout and stderr otherwise
1453+
result['stdout'] = read_stream(stdout, logger=iflogger)
1454+
result['stderr'] = read_stream(stderr, logger=iflogger)
1455+
1456+
runtime.returncode = proc.returncode
1457+
proc.terminate() # Ensure we are done
1458+
gc.collect() # Force GC for a cleanup
14561459

14571460
runtime.stderr = '\n'.join(result['stderr'])
14581461
runtime.stdout = '\n'.join(result['stdout'])
14591462
runtime.merged = '\n'.join(result['merged'])
1460-
runtime.returncode = proc.returncode
14611463
return runtime
14621464

14631465

@@ -1467,21 +1469,26 @@ def get_dependencies(name, environ):
14671469
Uses otool on darwin, ldd on linux. Currently doesn't support windows.
14681470
14691471
"""
1472+
cmd = None
14701473
if sys.platform == 'darwin':
1471-
proc = sp.Popen('otool -L `which %s`' % name,
1472-
stdout=sp.PIPE,
1473-
stderr=sp.PIPE,
1474-
shell=True,
1475-
env=environ)
1474+
cmd = 'otool -L `which {}`'.format
14761475
elif 'linux' in sys.platform:
1477-
proc = sp.Popen('ldd `which %s`' % name,
1478-
stdout=sp.PIPE,
1479-
stderr=sp.PIPE,
1480-
shell=True,
1481-
env=environ)
1482-
else:
1476+
cmd = 'ldd -L `which {}`'.format
1477+
1478+
if cmd is None:
14831479
return 'Platform %s not supported' % sys.platform
1484-
o, e = proc.communicate()
1480+
1481+
try:
1482+
proc = sp.Popen(
1483+
cmd(name), stdout=sp.PIPE, stderr=sp.PIPE, shell=True,
1484+
env=environ, close_fds=True)
1485+
o, e = proc.communicate()
1486+
proc.terminate()
1487+
gc.collect()
1488+
except:
1489+
iflogger.warning(
1490+
'Could not get linked libraries for "%s".', name)
1491+
return 'Failed collecting dependencies'
14851492
return o.rstrip()
14861493

14871494

@@ -1572,6 +1579,9 @@ def __init__(self, command=None, terminal_output=None, **inputs):
15721579
# Set command. Input argument takes precedence
15731580
self._cmd = command or getattr(self, '_cmd', None)
15741581

1582+
# Store dependencies in runtime object
1583+
self._ldd = str2bool(config.get('execution', 'get_linked_libs', 'true'))
1584+
15751585
if self._cmd is None:
15761586
raise Exception("Missing command")
15771587

@@ -1619,21 +1629,6 @@ def raise_exception(self, runtime):
16191629
def _get_environ(self):
16201630
return getattr(self.inputs, 'environ', {})
16211631

1622-
def version_from_command(self, flag='-v'):
1623-
cmdname = self.cmd.split()[0]
1624-
env = dict(os.environ)
1625-
if _exists_in_path(cmdname, env):
1626-
out_environ = self._get_environ()
1627-
env.update(out_environ)
1628-
proc = sp.Popen(' '.join((cmdname, flag)),
1629-
shell=True,
1630-
env=env,
1631-
stdout=sp.PIPE,
1632-
stderr=sp.PIPE,
1633-
)
1634-
o, e = proc.communicate()
1635-
return o
1636-
16371632
def _run_interface(self, runtime, correct_return_codes=(0,)):
16381633
"""Execute command via subprocess
16391634
@@ -1664,7 +1659,8 @@ def _run_interface(self, runtime, correct_return_codes=(0,)):
16641659
(self.cmd.split()[0], runtime.hostname))
16651660

16661661
runtime.command_path = cmd_path
1667-
runtime.dependencies = get_dependencies(executable_name, runtime.environ)
1662+
runtime.dependencies = (get_dependencies(executable_name, runtime.environ)
1663+
if self._ldd else '<skipped>')
16681664
runtime = run_command(runtime, output=self.terminal_output)
16691665
if runtime.returncode is None or \
16701666
runtime.returncode not in correct_return_codes:

nipype/pipeline/engine/workflows.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262

6363
logger = logging.getLogger('workflow')
6464

65+
6566
class Workflow(EngineBase):
6667
"""Controls the setup and execution of a pipeline of processes."""
6768

@@ -196,7 +197,7 @@ def connect(self, *args, **kwargs):
196197
# determine their inputs/outputs depending on
197198
# connection settings. Skip these modules in the check
198199
if dest in connected_ports[destnode]:
199-
raise Exception("""
200+
raise Exception("""\
200201
Trying to connect %s:%s to %s:%s but input '%s' of node '%s' is already
201202
connected.
202203
""" % (srcnode, source, destnode, dest, dest, destnode))
@@ -297,7 +298,7 @@ def disconnect(self, *args):
297298
remove = []
298299
for edge in conn:
299300
if edge in ed_conns:
300-
idx = ed_conns.index(edge)
301+
# idx = ed_conns.index(edge)
301302
remove.append((edge[0], edge[1]))
302303

303304
logger.debug('disconnect(): remove list %s', to_str(remove))
@@ -426,7 +427,7 @@ def write_graph(self, dotfilename='graph.dot', graph2use='hierarchical',
426427
base_dir = os.getcwd()
427428
base_dir = make_output_dir(base_dir)
428429
if graph2use in ['hierarchical', 'colored']:
429-
if self.name[:1].isdigit(): # these graphs break if int
430+
if self.name[:1].isdigit(): # these graphs break if int
430431
raise ValueError('{} graph failed, workflow name cannot begin '
431432
'with a number'.format(graph2use))
432433
dotfilename = op.join(base_dir, dotfilename)
@@ -646,7 +647,7 @@ def _write_report_info(self, workingdir, name, graph):
646647
# Avoid RuntimeWarning: divide by zero encountered in log10
647648
num_nodes = len(nodes)
648649
if num_nodes > 0:
649-
index_name = np.ceil(np.log10(num_nodes)).astype(int)
650+
index_name = np.ceil(np.log10(num_nodes)).astype(int)
650651
else:
651652
index_name = 0
652653
template = '%%0%dd_' % index_name
@@ -794,10 +795,10 @@ def _get_outputs(self):
794795
setattr(outputdict, node.name, outputs)
795796
return outputdict
796797

797-
def _set_input(self, object, name, newvalue):
798+
def _set_input(self, objekt, name, newvalue):
798799
"""Trait callback function to update a node input
799800
"""
800-
object.traits()[name].node.set_input(name, newvalue)
801+
objekt.traits()[name].node.set_input(name, newvalue)
801802

802803
def _set_node_input(self, node, param, source, sourceinfo):
803804
"""Set inputs of a node given the edge connection"""

nipype/pipeline/plugins/multiproc.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def run_node(node, updatehash, taskid):
6060
class NonDaemonProcess(Process):
6161
"""A non-daemon process to support internal multiprocessing.
6262
"""
63+
6364
def _get_daemon(self):
6465
return False
6566

@@ -123,7 +124,7 @@ def __init__(self, plugin_args=None):
123124

124125
# Instantiate different thread pools for non-daemon processes
125126
logger.debug('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)',
126-
'non' if non_daemon else '', self.processors, self.memory_gb)
127+
'non' * int(non_daemon), self.processors, self.memory_gb)
127128
self.pool = (NonDaemonPool if non_daemon else Pool)(processes=self.processors)
128129
self._stats = None
129130

0 commit comments

Comments
 (0)