Skip to content

[ENH] Revising use of subprocess.Popen #2289

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Nov 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ Upcoming release

###### [Full changelog](https://github.com/nipy/nipype/milestone/13)

* ENH: Memoize version checks (https://github.com/nipy/nipype/pull/2274, https://github.com/nipy/nipype/pull/2295)
* MAINT: Revise use of `subprocess.Popen` (https://github.com/nipy/nipype/pull/2289)
* ENH: Memorize version checks (https://github.com/nipy/nipype/pull/2274, https://github.com/nipy/nipype/pull/2295)


0.14.0rc1 (November 21, 2017)
Expand Down
79 changes: 52 additions & 27 deletions nipype/interfaces/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
Requires Packages to be installed
"""
from __future__ import print_function, division, unicode_literals, absolute_import
from future import standard_library
standard_library.install_aliases()
import gc

from builtins import range, object, open, str, bytes

from configparser import NoOptionError
from copy import deepcopy
import datetime
from datetime import datetime as dt
Expand All @@ -26,7 +25,6 @@
import select
import subprocess as sp
import sys
import time
from textwrap import wrap
from warnings import warn
import simplejson as json
Expand All @@ -43,6 +41,8 @@
traits, Undefined, TraitDictObject, TraitListObject, TraitError, isdefined,
File, Directory, DictStrStr, has_metadata, ImageFile)
from ..external.due import due
from future import standard_library
standard_library.install_aliases()

nipype_version = Version(__version__)
iflogger = logging.getLogger('interface')
Expand All @@ -58,6 +58,7 @@
class Str(traits.Unicode):
"""Replacement for the default traits.Str based in bytes"""


traits.Str = Str


Expand Down Expand Up @@ -1260,6 +1261,7 @@ class SimpleInterface(BaseInterface):
>>> os.chdir(old.strpath)

"""

def __init__(self, from_file=None, resource_monitor=None, **inputs):
super(SimpleInterface, self).__init__(
from_file=from_file, resource_monitor=resource_monitor, **inputs)
Expand Down Expand Up @@ -1387,8 +1389,7 @@ def run_command(runtime, output=None, timeout=0.01):
shell=True,
cwd=runtime.cwd,
env=env,
close_fds=True,
)
close_fds=True)
result = {
'stdout': [],
'stderr': [],
Expand Down Expand Up @@ -1427,37 +1428,50 @@ def _process(drain=0):
temp.sort()
result['merged'] = [r[1] for r in temp]

if output == 'allatonce':
stdout, stderr = proc.communicate()
result['stdout'] = read_stream(stdout, logger=iflogger)
result['stderr'] = read_stream(stderr, logger=iflogger)

elif output.startswith('file'):
if output.startswith('file'):
proc.wait()
if outfile is not None:
stdout.flush()
stdout.close()
with open(outfile, 'rb') as ofh:
stdoutstr = ofh.read()
result['stdout'] = read_stream(stdoutstr, logger=iflogger)
del stdoutstr

if errfile is not None:
stderr.flush()
stderr.close()
with open(errfile, 'rb') as efh:
stderrstr = efh.read()
result['stderr'] = read_stream(stderrstr, logger=iflogger)
del stderrstr

if output == 'file':
result['merged'] = result['stdout']
result['stdout'] = []
else:
proc.communicate() # Discard stdout and stderr
stdout, stderr = proc.communicate()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe make these stdoutstr, stdinstr, to be consistent with if output.startswith('file'), so that the same del statement will work in all cases.

Alternately, it might be simplest to just run gc.collect() in CommandLine._run_interface, after this function is called and everything is allowed to go out of scope.

if output == 'allatonce': # Discard stdout and stderr otherwise
result['stdout'] = read_stream(stdout, logger=iflogger)
result['stderr'] = read_stream(stderr, logger=iflogger)

runtime.returncode = proc.returncode
try:
proc.terminate() # Ensure we are done
except OSError as error:
# Python 2 raises when the process is already gone
if error.errno != errno.ESRCH:
raise

# Dereference & force GC for a cleanup
del proc
del stdout
del stderr
gc.collect()

runtime.stderr = '\n'.join(result['stderr'])
runtime.stdout = '\n'.join(result['stdout'])
runtime.merged = '\n'.join(result['merged'])
runtime.returncode = proc.returncode
return runtime


Expand All @@ -1467,21 +1481,26 @@ def get_dependencies(name, environ):
Uses otool on darwin, ldd on linux. Currently doesn't support windows.

"""
cmd = None
if sys.platform == 'darwin':
proc = sp.Popen('otool -L `which %s`' % name,
stdout=sp.PIPE,
stderr=sp.PIPE,
shell=True,
env=environ)
cmd = 'otool -L `which {}`'.format
elif 'linux' in sys.platform:
proc = sp.Popen('ldd `which %s`' % name,
stdout=sp.PIPE,
stderr=sp.PIPE,
shell=True,
env=environ)
else:
cmd = 'ldd -L `which {}`'.format

if cmd is None:
return 'Platform %s not supported' % sys.platform
o, e = proc.communicate()

try:
proc = sp.Popen(
cmd(name), stdout=sp.PIPE, stderr=sp.PIPE, shell=True,
env=environ, close_fds=True)
o, _ = proc.communicate()
proc.terminate()
gc.collect()
except:
iflogger.warning(
'Could not get linked libraries for "%s".', name)
return 'Failed collecting dependencies'
return o.rstrip()


Expand Down Expand Up @@ -1572,6 +1591,9 @@ def __init__(self, command=None, terminal_output=None, **inputs):
# Set command. Input argument takes precedence
self._cmd = command or getattr(self, '_cmd', None)

# Store dependencies in runtime object
self._ldd = str2bool(config.get('execution', 'get_linked_libs', 'true'))

if self._cmd is None:
raise Exception("Missing command")

Expand Down Expand Up @@ -1620,6 +1642,8 @@ def _get_environ(self):
return getattr(self.inputs, 'environ', {})

def version_from_command(self, flag='-v'):
iflogger.warning('version_from_command member of CommandLine was '
'Deprecated in nipype-1.0.0 and deleted in 1.1.0')
cmdname = self.cmd.split()[0]
env = dict(os.environ)
if _exists_in_path(cmdname, env):
Expand Down Expand Up @@ -1664,7 +1688,8 @@ def _run_interface(self, runtime, correct_return_codes=(0,)):
(self.cmd.split()[0], runtime.hostname))

runtime.command_path = cmd_path
runtime.dependencies = get_dependencies(executable_name, runtime.environ)
runtime.dependencies = (get_dependencies(executable_name, runtime.environ)
if self._ldd else '<skipped>')
runtime = run_command(runtime, output=self.terminal_output)
if runtime.returncode is None or \
runtime.returncode not in correct_return_codes:
Expand Down
13 changes: 7 additions & 6 deletions nipype/pipeline/engine/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@

logger = logging.getLogger('workflow')


class Workflow(EngineBase):
"""Controls the setup and execution of a pipeline of processes."""

Expand Down Expand Up @@ -196,7 +197,7 @@ def connect(self, *args, **kwargs):
# determine their inputs/outputs depending on
# connection settings. Skip these modules in the check
if dest in connected_ports[destnode]:
raise Exception("""
raise Exception("""\
Trying to connect %s:%s to %s:%s but input '%s' of node '%s' is already
connected.
""" % (srcnode, source, destnode, dest, dest, destnode))
Expand Down Expand Up @@ -297,7 +298,7 @@ def disconnect(self, *args):
remove = []
for edge in conn:
if edge in ed_conns:
idx = ed_conns.index(edge)
# idx = ed_conns.index(edge)
remove.append((edge[0], edge[1]))

logger.debug('disconnect(): remove list %s', to_str(remove))
Expand Down Expand Up @@ -426,7 +427,7 @@ def write_graph(self, dotfilename='graph.dot', graph2use='hierarchical',
base_dir = os.getcwd()
base_dir = make_output_dir(base_dir)
if graph2use in ['hierarchical', 'colored']:
if self.name[:1].isdigit(): # these graphs break if int
if self.name[:1].isdigit(): # these graphs break if int
raise ValueError('{} graph failed, workflow name cannot begin '
'with a number'.format(graph2use))
dotfilename = op.join(base_dir, dotfilename)
Expand Down Expand Up @@ -646,7 +647,7 @@ def _write_report_info(self, workingdir, name, graph):
# Avoid RuntimeWarning: divide by zero encountered in log10
num_nodes = len(nodes)
if num_nodes > 0:
index_name = np.ceil(np.log10(num_nodes)).astype(int)
index_name = np.ceil(np.log10(num_nodes)).astype(int)
else:
index_name = 0
template = '%%0%dd_' % index_name
Expand Down Expand Up @@ -794,10 +795,10 @@ def _get_outputs(self):
setattr(outputdict, node.name, outputs)
return outputdict

def _set_input(self, object, name, newvalue):
def _set_input(self, objekt, name, newvalue):
"""Trait callback function to update a node input
"""
object.traits()[name].node.set_input(name, newvalue)
objekt.traits()[name].node.set_input(name, newvalue)

def _set_node_input(self, node, param, source, sourceinfo):
"""Set inputs of a node given the edge connection"""
Expand Down
2 changes: 1 addition & 1 deletion nipype/pipeline/plugins/multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def __init__(self, plugin_args=None):

# Instantiate different thread pools for non-daemon processes
logger.debug('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)',
'non' if non_daemon else '', self.processors, self.memory_gb)
'non' * int(non_daemon), self.processors, self.memory_gb)

NipypePool = NonDaemonPool if non_daemon else Pool
try:
Expand Down