From b0828e0a4ea83fa3a6ebb9efae7d70d38eae82b3 Mon Sep 17 00:00:00 2001 From: oesteban Date: Thu, 16 Nov 2017 11:31:52 -0800 Subject: [PATCH 1/7] [ENH] Revising use of subprocess.Popen Make sure everything is tidied up after using Popen. --- nipype/interfaces/base.py | 96 +++++++++++++--------------- nipype/pipeline/engine/workflows.py | 13 ++-- nipype/pipeline/plugins/multiproc.py | 3 +- 3 files changed, 55 insertions(+), 57 deletions(-) diff --git a/nipype/interfaces/base.py b/nipype/interfaces/base.py index facafa5fc9..3f32e6537a 100644 --- a/nipype/interfaces/base.py +++ b/nipype/interfaces/base.py @@ -9,11 +9,10 @@ Requires Packages to be installed """ from __future__ import print_function, division, unicode_literals, absolute_import -from future import standard_library -standard_library.install_aliases() +import gc + from builtins import range, object, open, str, bytes -from configparser import NoOptionError from copy import deepcopy import datetime from datetime import datetime as dt @@ -26,7 +25,6 @@ import select import subprocess as sp import sys -import time from textwrap import wrap from warnings import warn import simplejson as json @@ -43,6 +41,8 @@ traits, Undefined, TraitDictObject, TraitListObject, TraitError, isdefined, File, Directory, DictStrStr, has_metadata, ImageFile) from ..external.due import due +from future import standard_library +standard_library.install_aliases() nipype_version = Version(__version__) iflogger = logging.getLogger('interface') @@ -58,6 +58,7 @@ class Str(traits.Unicode): """Replacement for the default traits.Str based in bytes""" + traits.Str = Str @@ -634,16 +635,16 @@ def __deepcopy__(self, memo): return memo[id_self] dup_dict = deepcopy(self.get(), memo) # access all keys - for key in self.copyable_trait_names(): - if key in self.__dict__.keys(): - _ = getattr(self, key) + # for key in self.copyable_trait_names(): + # if key in self.__dict__.keys(): + # _ = getattr(self, key) # clone once dup = self.clone_traits(memo=memo) - for key in self.copyable_trait_names(): - try: - _ = getattr(dup, key) - except: - pass + # for key in self.copyable_trait_names(): + # try: + # _ = getattr(dup, key) + # except: + # pass # clone twice dup = self.clone_traits(memo=memo) dup.trait_set(**dup_dict) @@ -1260,6 +1261,7 @@ class SimpleInterface(BaseInterface): >>> os.chdir(old.strpath) """ + def __init__(self, from_file=None, resource_monitor=None, **inputs): super(SimpleInterface, self).__init__( from_file=from_file, resource_monitor=resource_monitor, **inputs) @@ -1387,8 +1389,7 @@ def run_command(runtime, output=None, timeout=0.01): shell=True, cwd=runtime.cwd, env=env, - close_fds=True, - ) + close_fds=True) result = { 'stdout': [], 'stderr': [], @@ -1427,12 +1428,7 @@ def _process(drain=0): temp.sort() result['merged'] = [r[1] for r in temp] - if output == 'allatonce': - stdout, stderr = proc.communicate() - result['stdout'] = read_stream(stdout, logger=iflogger) - result['stderr'] = read_stream(stderr, logger=iflogger) - - elif output.startswith('file'): + if output.startswith('file'): proc.wait() if outfile is not None: stdout.flush() @@ -1452,12 +1448,18 @@ def _process(drain=0): result['merged'] = result['stdout'] result['stdout'] = [] else: - proc.communicate() # Discard stdout and stderr + stdout, stderr = proc.communicate() + if output == 'allatonce': # Discard stdout and stderr otherwise + result['stdout'] = read_stream(stdout, logger=iflogger) + result['stderr'] = read_stream(stderr, logger=iflogger) + + runtime.returncode = proc.returncode + proc.terminate() # Ensure we are done + gc.collect() # Force GC for a cleanup runtime.stderr = '\n'.join(result['stderr']) runtime.stdout = '\n'.join(result['stdout']) runtime.merged = '\n'.join(result['merged']) - runtime.returncode = proc.returncode return runtime @@ -1467,21 +1469,26 @@ def get_dependencies(name, environ): Uses otool on darwin, ldd on linux. Currently doesn't support windows. """ + cmd = None if sys.platform == 'darwin': - proc = sp.Popen('otool -L `which %s`' % name, - stdout=sp.PIPE, - stderr=sp.PIPE, - shell=True, - env=environ) + cmd = 'otool -L `which {}`'.format elif 'linux' in sys.platform: - proc = sp.Popen('ldd `which %s`' % name, - stdout=sp.PIPE, - stderr=sp.PIPE, - shell=True, - env=environ) - else: + cmd = 'ldd -L `which {}`'.format + + if cmd is None: return 'Platform %s not supported' % sys.platform - o, e = proc.communicate() + + try: + proc = sp.Popen( + cmd(name), stdout=sp.PIPE, stderr=sp.PIPE, shell=True, + env=environ, close_fds=True) + o, e = proc.communicate() + proc.terminate() + gc.collect() + except: + iflogger.warning( + 'Could not get linked libraries for "%s".', name) + return 'Failed collecting dependencies' return o.rstrip() @@ -1572,6 +1579,9 @@ def __init__(self, command=None, terminal_output=None, **inputs): # Set command. Input argument takes precedence self._cmd = command or getattr(self, '_cmd', None) + # Store dependencies in runtime object + self._ldd = str2bool(config.get('execution', 'get_linked_libs', 'true')) + if self._cmd is None: raise Exception("Missing command") @@ -1619,21 +1629,6 @@ def raise_exception(self, runtime): def _get_environ(self): return getattr(self.inputs, 'environ', {}) - def version_from_command(self, flag='-v'): - cmdname = self.cmd.split()[0] - env = dict(os.environ) - if _exists_in_path(cmdname, env): - out_environ = self._get_environ() - env.update(out_environ) - proc = sp.Popen(' '.join((cmdname, flag)), - shell=True, - env=env, - stdout=sp.PIPE, - stderr=sp.PIPE, - ) - o, e = proc.communicate() - return o - def _run_interface(self, runtime, correct_return_codes=(0,)): """Execute command via subprocess @@ -1664,7 +1659,8 @@ def _run_interface(self, runtime, correct_return_codes=(0,)): (self.cmd.split()[0], runtime.hostname)) runtime.command_path = cmd_path - runtime.dependencies = get_dependencies(executable_name, runtime.environ) + runtime.dependencies = (get_dependencies(executable_name, runtime.environ) + if self._ldd else '') runtime = run_command(runtime, output=self.terminal_output) if runtime.returncode is None or \ runtime.returncode not in correct_return_codes: diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index cd50bb72b3..7734dcb37c 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -62,6 +62,7 @@ logger = logging.getLogger('workflow') + class Workflow(EngineBase): """Controls the setup and execution of a pipeline of processes.""" @@ -196,7 +197,7 @@ def connect(self, *args, **kwargs): # determine their inputs/outputs depending on # connection settings. Skip these modules in the check if dest in connected_ports[destnode]: - raise Exception(""" + raise Exception("""\ Trying to connect %s:%s to %s:%s but input '%s' of node '%s' is already connected. """ % (srcnode, source, destnode, dest, dest, destnode)) @@ -297,7 +298,7 @@ def disconnect(self, *args): remove = [] for edge in conn: if edge in ed_conns: - idx = ed_conns.index(edge) + # idx = ed_conns.index(edge) remove.append((edge[0], edge[1])) logger.debug('disconnect(): remove list %s', to_str(remove)) @@ -426,7 +427,7 @@ def write_graph(self, dotfilename='graph.dot', graph2use='hierarchical', base_dir = os.getcwd() base_dir = make_output_dir(base_dir) if graph2use in ['hierarchical', 'colored']: - if self.name[:1].isdigit(): # these graphs break if int + if self.name[:1].isdigit(): # these graphs break if int raise ValueError('{} graph failed, workflow name cannot begin ' 'with a number'.format(graph2use)) dotfilename = op.join(base_dir, dotfilename) @@ -646,7 +647,7 @@ def _write_report_info(self, workingdir, name, graph): # Avoid RuntimeWarning: divide by zero encountered in log10 num_nodes = len(nodes) if num_nodes > 0: - index_name = np.ceil(np.log10(num_nodes)).astype(int) + index_name = np.ceil(np.log10(num_nodes)).astype(int) else: index_name = 0 template = '%%0%dd_' % index_name @@ -794,10 +795,10 @@ def _get_outputs(self): setattr(outputdict, node.name, outputs) return outputdict - def _set_input(self, object, name, newvalue): + def _set_input(self, objekt, name, newvalue): """Trait callback function to update a node input """ - object.traits()[name].node.set_input(name, newvalue) + objekt.traits()[name].node.set_input(name, newvalue) def _set_node_input(self, node, param, source, sourceinfo): """Set inputs of a node given the edge connection""" diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index ebed261185..6a9fdcafa6 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -60,6 +60,7 @@ def run_node(node, updatehash, taskid): class NonDaemonProcess(Process): """A non-daemon process to support internal multiprocessing. """ + def _get_daemon(self): return False @@ -123,7 +124,7 @@ def __init__(self, plugin_args=None): # Instantiate different thread pools for non-daemon processes logger.debug('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)', - 'non' if non_daemon else '', self.processors, self.memory_gb) + 'non' * int(non_daemon), self.processors, self.memory_gb) self.pool = (NonDaemonPool if non_daemon else Pool)(processes=self.processors) self._stats = None From 1913dca23eb084bc95440c8bc83dda165ac7d524 Mon Sep 17 00:00:00 2001 From: oesteban Date: Fri, 17 Nov 2017 09:30:16 -0800 Subject: [PATCH 2/7] fix tests, address @effigies' comments --- nipype/interfaces/base.py | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/nipype/interfaces/base.py b/nipype/interfaces/base.py index 3f32e6537a..5e9dd181c1 100644 --- a/nipype/interfaces/base.py +++ b/nipype/interfaces/base.py @@ -1454,8 +1454,18 @@ def _process(drain=0): result['stderr'] = read_stream(stderr, logger=iflogger) runtime.returncode = proc.returncode - proc.terminate() # Ensure we are done - gc.collect() # Force GC for a cleanup + try: + proc.terminate() # Ensure we are done + except OSError as error: + # Python 2 raises when the process is already gone + if error.errno != errno.ESRCH: + raise + + # Dereference & force GC for a cleanup + del proc + del stdout + del stderr + gc.collect() runtime.stderr = '\n'.join(result['stderr']) runtime.stdout = '\n'.join(result['stdout']) @@ -1482,7 +1492,7 @@ def get_dependencies(name, environ): proc = sp.Popen( cmd(name), stdout=sp.PIPE, stderr=sp.PIPE, shell=True, env=environ, close_fds=True) - o, e = proc.communicate() + o, _ = proc.communicate() proc.terminate() gc.collect() except: @@ -1629,6 +1639,23 @@ def raise_exception(self, runtime): def _get_environ(self): return getattr(self.inputs, 'environ', {}) + def version_from_command(self, flag='-v'): + iflogger.warning('version_from_command member of CommandLine was ' + 'Deprecated in nipype-1.0.0 and deleted in 2.0.0') + cmdname = self.cmd.split()[0] + env = dict(os.environ) + if _exists_in_path(cmdname, env): + out_environ = self._get_environ() + env.update(out_environ) + proc = sp.Popen(' '.join((cmdname, flag)), + shell=True, + env=env, + stdout=sp.PIPE, + stderr=sp.PIPE, + ) + o, e = proc.communicate() + return o + def _run_interface(self, runtime, correct_return_codes=(0,)): """Execute command via subprocess From 309c6ea9c0bc3e77ad354f2b1a1869f7641b5d48 Mon Sep 17 00:00:00 2001 From: oesteban Date: Tue, 21 Nov 2017 09:13:19 -0800 Subject: [PATCH 3/7] final touch --- nipype/interfaces/base.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/nipype/interfaces/base.py b/nipype/interfaces/base.py index 5e9dd181c1..c2520bec4d 100644 --- a/nipype/interfaces/base.py +++ b/nipype/interfaces/base.py @@ -1448,10 +1448,12 @@ def _process(drain=0): result['merged'] = result['stdout'] result['stdout'] = [] else: - stdout, stderr = proc.communicate() + stdoutstr, stderrstr = proc.communicate() if output == 'allatonce': # Discard stdout and stderr otherwise - result['stdout'] = read_stream(stdout, logger=iflogger) - result['stderr'] = read_stream(stderr, logger=iflogger) + result['stdout'] = read_stream(stdoutstr, logger=iflogger) + result['stderr'] = read_stream(stderrstr, logger=iflogger) + del stdoutstr + del stderrstr runtime.returncode = proc.returncode try: From cadcdbe4cbefd27c7c1e5b0020e1a919c544679c Mon Sep 17 00:00:00 2001 From: oesteban Date: Tue, 21 Nov 2017 09:14:24 -0800 Subject: [PATCH 4/7] update CHANGES --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index f7761f7b91..07373d1f0f 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,7 @@ Upcoming release (0.14.0) ================ +* MAINT: Revise use of `subprocess.Popen` (https://github.com/nipy/nipype/pull/2289) * FIX: Testing maintainance and improvements (https://github.com/nipy/nipype/pull/2252) * ENH: Add elapsed_time and final metric_value to ants.Registration (https://github.com/nipy/nipype/pull/1985) * ENH: Improve terminal_output feature (https://github.com/nipy/nipype/pull/2209) From ff63da8b2284258dd0f3df5de2be98f45964c27a Mon Sep 17 00:00:00 2001 From: oesteban Date: Wed, 22 Nov 2017 11:04:44 -0800 Subject: [PATCH 5/7] make sure we clear up all stdout, stderr, stdoutstr, stderrstr --- nipype/interfaces/base.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/nipype/interfaces/base.py b/nipype/interfaces/base.py index c2520bec4d..c20666a30b 100644 --- a/nipype/interfaces/base.py +++ b/nipype/interfaces/base.py @@ -1436,6 +1436,7 @@ def _process(drain=0): with open(outfile, 'rb') as ofh: stdoutstr = ofh.read() result['stdout'] = read_stream(stdoutstr, logger=iflogger) + del stdoutstr if errfile is not None: stderr.flush() @@ -1443,17 +1444,16 @@ def _process(drain=0): with open(errfile, 'rb') as efh: stderrstr = efh.read() result['stderr'] = read_stream(stderrstr, logger=iflogger) + del stderrstr if output == 'file': result['merged'] = result['stdout'] result['stdout'] = [] else: - stdoutstr, stderrstr = proc.communicate() + stdout, stderr = proc.communicate() if output == 'allatonce': # Discard stdout and stderr otherwise - result['stdout'] = read_stream(stdoutstr, logger=iflogger) - result['stderr'] = read_stream(stderrstr, logger=iflogger) - del stdoutstr - del stderrstr + result['stdout'] = read_stream(stdout, logger=iflogger) + result['stderr'] = read_stream(stderr, logger=iflogger) runtime.returncode = proc.returncode try: From 73ff136a2cd050028db1cb4ed33cfdbf8e89e4d5 Mon Sep 17 00:00:00 2001 From: oesteban Date: Wed, 22 Nov 2017 11:05:53 -0800 Subject: [PATCH 6/7] fix deprecation message --- nipype/interfaces/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nipype/interfaces/base.py b/nipype/interfaces/base.py index c20666a30b..3c7576b039 100644 --- a/nipype/interfaces/base.py +++ b/nipype/interfaces/base.py @@ -1643,7 +1643,7 @@ def _get_environ(self): def version_from_command(self, flag='-v'): iflogger.warning('version_from_command member of CommandLine was ' - 'Deprecated in nipype-1.0.0 and deleted in 2.0.0') + 'Deprecated in nipype-1.0.0 and deleted in 1.1.0') cmdname = self.cmd.split()[0] env = dict(os.environ) if _exists_in_path(cmdname, env): From 1676c84b24a96757e54bd4e0997de793e9e1a548 Mon Sep 17 00:00:00 2001 From: oesteban Date: Wed, 22 Nov 2017 11:07:42 -0800 Subject: [PATCH 7/7] undo commenting out hacks of DynamicTraitedSpeck.__deepcopy__ --- nipype/interfaces/base.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/nipype/interfaces/base.py b/nipype/interfaces/base.py index 3c7576b039..ba398352a0 100644 --- a/nipype/interfaces/base.py +++ b/nipype/interfaces/base.py @@ -635,16 +635,16 @@ def __deepcopy__(self, memo): return memo[id_self] dup_dict = deepcopy(self.get(), memo) # access all keys - # for key in self.copyable_trait_names(): - # if key in self.__dict__.keys(): - # _ = getattr(self, key) + for key in self.copyable_trait_names(): + if key in self.__dict__.keys(): + _ = getattr(self, key) # clone once dup = self.clone_traits(memo=memo) - # for key in self.copyable_trait_names(): - # try: - # _ = getattr(dup, key) - # except: - # pass + for key in self.copyable_trait_names(): + try: + _ = getattr(dup, key) + except: + pass # clone twice dup = self.clone_traits(memo=memo) dup.trait_set(**dup_dict)