diff --git a/CHANGES b/CHANGES index 2f2832f725..0845eee283 100644 --- a/CHANGES +++ b/CHANGES @@ -7,7 +7,8 @@ Upcoming release ###### [Full changelog](https://github.com/nipy/nipype/milestone/13) -* ENH: Memoize version checks (https://github.com/nipy/nipype/pull/2274, https://github.com/nipy/nipype/pull/2295) +* MAINT: Revise use of `subprocess.Popen` (https://github.com/nipy/nipype/pull/2289) +* ENH: Memorize version checks (https://github.com/nipy/nipype/pull/2274, https://github.com/nipy/nipype/pull/2295) 0.14.0rc1 (November 21, 2017) diff --git a/nipype/interfaces/base.py b/nipype/interfaces/base.py index facafa5fc9..ba398352a0 100644 --- a/nipype/interfaces/base.py +++ b/nipype/interfaces/base.py @@ -9,11 +9,10 @@ Requires Packages to be installed """ from __future__ import print_function, division, unicode_literals, absolute_import -from future import standard_library -standard_library.install_aliases() +import gc + from builtins import range, object, open, str, bytes -from configparser import NoOptionError from copy import deepcopy import datetime from datetime import datetime as dt @@ -26,7 +25,6 @@ import select import subprocess as sp import sys -import time from textwrap import wrap from warnings import warn import simplejson as json @@ -43,6 +41,8 @@ traits, Undefined, TraitDictObject, TraitListObject, TraitError, isdefined, File, Directory, DictStrStr, has_metadata, ImageFile) from ..external.due import due +from future import standard_library +standard_library.install_aliases() nipype_version = Version(__version__) iflogger = logging.getLogger('interface') @@ -58,6 +58,7 @@ class Str(traits.Unicode): """Replacement for the default traits.Str based in bytes""" + traits.Str = Str @@ -1260,6 +1261,7 @@ class SimpleInterface(BaseInterface): >>> os.chdir(old.strpath) """ + def __init__(self, from_file=None, resource_monitor=None, **inputs): super(SimpleInterface, self).__init__( from_file=from_file, resource_monitor=resource_monitor, **inputs) @@ -1387,8 +1389,7 @@ def run_command(runtime, output=None, timeout=0.01): shell=True, cwd=runtime.cwd, env=env, - close_fds=True, - ) + close_fds=True) result = { 'stdout': [], 'stderr': [], @@ -1427,12 +1428,7 @@ def _process(drain=0): temp.sort() result['merged'] = [r[1] for r in temp] - if output == 'allatonce': - stdout, stderr = proc.communicate() - result['stdout'] = read_stream(stdout, logger=iflogger) - result['stderr'] = read_stream(stderr, logger=iflogger) - - elif output.startswith('file'): + if output.startswith('file'): proc.wait() if outfile is not None: stdout.flush() @@ -1440,6 +1436,7 @@ def _process(drain=0): with open(outfile, 'rb') as ofh: stdoutstr = ofh.read() result['stdout'] = read_stream(stdoutstr, logger=iflogger) + del stdoutstr if errfile is not None: stderr.flush() @@ -1447,17 +1444,34 @@ def _process(drain=0): with open(errfile, 'rb') as efh: stderrstr = efh.read() result['stderr'] = read_stream(stderrstr, logger=iflogger) + del stderrstr if output == 'file': result['merged'] = result['stdout'] result['stdout'] = [] else: - proc.communicate() # Discard stdout and stderr + stdout, stderr = proc.communicate() + if output == 'allatonce': # Discard stdout and stderr otherwise + result['stdout'] = read_stream(stdout, logger=iflogger) + result['stderr'] = read_stream(stderr, logger=iflogger) + + runtime.returncode = proc.returncode + try: + proc.terminate() # Ensure we are done + except OSError as error: + # Python 2 raises when the process is already gone + if error.errno != errno.ESRCH: + raise + + # Dereference & force GC for a cleanup + del proc + del stdout + del stderr + gc.collect() runtime.stderr = '\n'.join(result['stderr']) runtime.stdout = '\n'.join(result['stdout']) runtime.merged = '\n'.join(result['merged']) - runtime.returncode = proc.returncode return runtime @@ -1467,21 +1481,26 @@ def get_dependencies(name, environ): Uses otool on darwin, ldd on linux. Currently doesn't support windows. """ + cmd = None if sys.platform == 'darwin': - proc = sp.Popen('otool -L `which %s`' % name, - stdout=sp.PIPE, - stderr=sp.PIPE, - shell=True, - env=environ) + cmd = 'otool -L `which {}`'.format elif 'linux' in sys.platform: - proc = sp.Popen('ldd `which %s`' % name, - stdout=sp.PIPE, - stderr=sp.PIPE, - shell=True, - env=environ) - else: + cmd = 'ldd -L `which {}`'.format + + if cmd is None: return 'Platform %s not supported' % sys.platform - o, e = proc.communicate() + + try: + proc = sp.Popen( + cmd(name), stdout=sp.PIPE, stderr=sp.PIPE, shell=True, + env=environ, close_fds=True) + o, _ = proc.communicate() + proc.terminate() + gc.collect() + except: + iflogger.warning( + 'Could not get linked libraries for "%s".', name) + return 'Failed collecting dependencies' return o.rstrip() @@ -1572,6 +1591,9 @@ def __init__(self, command=None, terminal_output=None, **inputs): # Set command. Input argument takes precedence self._cmd = command or getattr(self, '_cmd', None) + # Store dependencies in runtime object + self._ldd = str2bool(config.get('execution', 'get_linked_libs', 'true')) + if self._cmd is None: raise Exception("Missing command") @@ -1620,6 +1642,8 @@ def _get_environ(self): return getattr(self.inputs, 'environ', {}) def version_from_command(self, flag='-v'): + iflogger.warning('version_from_command member of CommandLine was ' + 'Deprecated in nipype-1.0.0 and deleted in 1.1.0') cmdname = self.cmd.split()[0] env = dict(os.environ) if _exists_in_path(cmdname, env): @@ -1664,7 +1688,8 @@ def _run_interface(self, runtime, correct_return_codes=(0,)): (self.cmd.split()[0], runtime.hostname)) runtime.command_path = cmd_path - runtime.dependencies = get_dependencies(executable_name, runtime.environ) + runtime.dependencies = (get_dependencies(executable_name, runtime.environ) + if self._ldd else '') runtime = run_command(runtime, output=self.terminal_output) if runtime.returncode is None or \ runtime.returncode not in correct_return_codes: diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index cd50bb72b3..7734dcb37c 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -62,6 +62,7 @@ logger = logging.getLogger('workflow') + class Workflow(EngineBase): """Controls the setup and execution of a pipeline of processes.""" @@ -196,7 +197,7 @@ def connect(self, *args, **kwargs): # determine their inputs/outputs depending on # connection settings. Skip these modules in the check if dest in connected_ports[destnode]: - raise Exception(""" + raise Exception("""\ Trying to connect %s:%s to %s:%s but input '%s' of node '%s' is already connected. """ % (srcnode, source, destnode, dest, dest, destnode)) @@ -297,7 +298,7 @@ def disconnect(self, *args): remove = [] for edge in conn: if edge in ed_conns: - idx = ed_conns.index(edge) + # idx = ed_conns.index(edge) remove.append((edge[0], edge[1])) logger.debug('disconnect(): remove list %s', to_str(remove)) @@ -426,7 +427,7 @@ def write_graph(self, dotfilename='graph.dot', graph2use='hierarchical', base_dir = os.getcwd() base_dir = make_output_dir(base_dir) if graph2use in ['hierarchical', 'colored']: - if self.name[:1].isdigit(): # these graphs break if int + if self.name[:1].isdigit(): # these graphs break if int raise ValueError('{} graph failed, workflow name cannot begin ' 'with a number'.format(graph2use)) dotfilename = op.join(base_dir, dotfilename) @@ -646,7 +647,7 @@ def _write_report_info(self, workingdir, name, graph): # Avoid RuntimeWarning: divide by zero encountered in log10 num_nodes = len(nodes) if num_nodes > 0: - index_name = np.ceil(np.log10(num_nodes)).astype(int) + index_name = np.ceil(np.log10(num_nodes)).astype(int) else: index_name = 0 template = '%%0%dd_' % index_name @@ -794,10 +795,10 @@ def _get_outputs(self): setattr(outputdict, node.name, outputs) return outputdict - def _set_input(self, object, name, newvalue): + def _set_input(self, objekt, name, newvalue): """Trait callback function to update a node input """ - object.traits()[name].node.set_input(name, newvalue) + objekt.traits()[name].node.set_input(name, newvalue) def _set_node_input(self, node, param, source, sourceinfo): """Set inputs of a node given the edge connection""" diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 595b0e1947..16bfb51a0d 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -127,7 +127,7 @@ def __init__(self, plugin_args=None): # Instantiate different thread pools for non-daemon processes logger.debug('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)', - 'non' if non_daemon else '', self.processors, self.memory_gb) + 'non' * int(non_daemon), self.processors, self.memory_gb) NipypePool = NonDaemonPool if non_daemon else Pool try: