From 34c7092fb14b453491c7741a1384173347b02d59 Mon Sep 17 00:00:00 2001 From: Hans Johnson Date: Tue, 22 Sep 2015 11:15:01 -0500 Subject: [PATCH 01/22] COMP: Fix runtime warning from ipython IPython/parallel.py:13: ShimWarning: The `IPython.parallel` package has been deprecated. You should import from ipyparallel instead. https://github.com/ipython/ipyparallel https://ipyparallel.readthedocs.org/en/latest/ --- nipype/pipeline/plugins/ipython.py | 8 ++++---- nipype/pipeline/plugins/ipythonx.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/nipype/pipeline/plugins/ipython.py b/nipype/pipeline/plugins/ipython.py index 87cb0d686d..456d6d27bf 100644 --- a/nipype/pipeline/plugins/ipython.py +++ b/nipype/pipeline/plugins/ipython.py @@ -10,7 +10,7 @@ IPython_not_loaded = False try: from IPython import __version__ as IPyversion - from IPython.parallel.error import TimeoutError + from ipyparallel.error import TimeoutError except: IPython_not_loaded = True @@ -42,7 +42,7 @@ class IPythonPlugin(DistributedPluginBase): def __init__(self, plugin_args=None): if IPython_not_loaded: - raise ImportError('IPython parallel could not be imported') + raise ImportError('ipyparallel could not be imported') super(IPythonPlugin, self).__init__(plugin_args=plugin_args) self.iparallel = None self.taskclient = None @@ -51,11 +51,11 @@ def __init__(self, plugin_args=None): def run(self, graph, config, updatehash=False): """Executes a pre-defined pipeline is distributed approaches - based on IPython's parallel processing interface + based on IPython's ipyparallel processing interface """ # retrieve clients again try: - name = 'IPython.parallel' + name = 'ipyparallel' __import__(name) self.iparallel = sys.modules[name] except ImportError: diff --git a/nipype/pipeline/plugins/ipythonx.py b/nipype/pipeline/plugins/ipythonx.py index 28418f142f..f034f88371 100644 --- a/nipype/pipeline/plugins/ipythonx.py +++ b/nipype/pipeline/plugins/ipythonx.py @@ -21,14 +21,14 @@ class IPythonXPlugin(DistributedPluginBase): def __init__(self, plugin_args=None): if IPython_not_loaded: - raise ImportError('IPython parallel could not be imported') + raise ImportError('ipyparallel could not be imported') super(IPythonXPlugin, self).__init__(plugin_args=plugin_args) self.ipyclient = None self.taskclient = None def run(self, graph, config, updatehash=False): """Executes a pre-defined pipeline is distributed approaches - based on IPython's parallel processing interface + based on IPython's ipyparallel processing interface """ # retrieve clients again try: From 8e23884d72015f4c63345344c56319fd0b13be09 Mon Sep 17 00:00:00 2001 From: Hans Johnson Date: Tue, 29 Sep 2015 08:47:27 -0500 Subject: [PATCH 02/22] STYLE: Avoid warning with divide by zero A warning was displayed when the length of an iterable was zero. "Avoid RuntimeWarning: divide by zero encountered in log10" Avoid this warning by explicitly checking for zero and giving alternate behavior when no nodes are presented. --- nipype/pipeline/engine.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/nipype/pipeline/engine.py b/nipype/pipeline/engine.py index 70b8e71754..ce681daa13 100644 --- a/nipype/pipeline/engine.py +++ b/nipype/pipeline/engine.py @@ -750,7 +750,14 @@ def _write_report_info(self, workingdir, name, graph): value=1)) save_json(graph_file, json_dict) graph_file = op.join(report_dir, 'graph.json') - template = '%%0%dd_' % np.ceil(np.log10(len(nodes))).astype(int) + # Avoid RuntimeWarning: divide by zero encountered in log10 + num_nodes = len(nodes) + if num_nodes > 0: + index_name = np.ceil(np.log10(num_nodes)).astype(int) + else: + index_name = 0 + template = '%%0%dd_' % index_name + def getname(u, i): name_parts = u.fullname.split('.') #return '.'.join(name_parts[:-1] + [template % i + name_parts[-1]]) From 26cc84c3f173b282fe07aa71dc665ea9e23d631f Mon Sep 17 00:00:00 2001 From: Simon R Date: Mon, 30 Nov 2015 16:43:08 +0100 Subject: [PATCH 03/22] Added XOR to mkmask and source_file mri_surf2vol can either be run using a source file OR without a source_file if the user wants to create a binary mask. Added XOR parameter to take this behavior into account --- nipype/interfaces/freesurfer/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nipype/interfaces/freesurfer/utils.py b/nipype/interfaces/freesurfer/utils.py index 0213dc30d4..5697130131 100644 --- a/nipype/interfaces/freesurfer/utils.py +++ b/nipype/interfaces/freesurfer/utils.py @@ -390,7 +390,7 @@ def _gen_filename(self, name): class Surface2VolTransformInputSpec(FSTraitedSpec): source_file = File(exists=True, argstr='--surfval %s', - copyfile=False, mandatory=True, + copyfile=False, mandatory=True, xor=['mkmask'], desc='This is the source of the surface values') hemi = traits.Str(argstr='--hemi %s', mandatory=True, desc='hemisphere of data') @@ -404,7 +404,7 @@ class Surface2VolTransformInputSpec(FSTraitedSpec): template_file = File(exists=True, argstr='--template %s', desc='Output template volume') mkmask = traits.Bool(desc='make a mask instead of loading surface values', - argstr='--mkmask') + argstr='--mkmask', xor=['source_file']) vertexvol_file = File(name_template="%s_asVol_vertex.nii", desc=('Path name of the vertex output volume, which ' 'is the same as output volume except that the ' From 51aa4f5fb12ceeeda59d6f24f26b52538056bcae Mon Sep 17 00:00:00 2001 From: Simon R Date: Fri, 15 Jan 2016 08:08:51 +0100 Subject: [PATCH 04/22] Include OAR-args MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The arguments passed to the oars command weren’t actually passed to it. Now they are, thus one can for example use a longer walltime etc --- nipype/pipeline/plugins/oar.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nipype/pipeline/plugins/oar.py b/nipype/pipeline/plugins/oar.py index fab0f2513a..07ceb8e417 100644 --- a/nipype/pipeline/plugins/oar.py +++ b/nipype/pipeline/plugins/oar.py @@ -37,6 +37,8 @@ def __init__(self, **kwargs): self._max_tries = 2 self._max_jobname_length = 15 if 'plugin_args' in kwargs and kwargs['plugin_args']: + if 'oarsub_args' in kwargs['plugin_args']: + self._oarsub_args = kwargs['plugin_args']['oarsub_args'] if 'retry_timeout' in kwargs['plugin_args']: self._retry_timeout = kwargs['plugin_args']['retry_timeout'] if 'max_tries' in kwargs['plugin_args']: From adefb22d8a37fd77a0c7d153a6b6e97f58cf8c6c Mon Sep 17 00:00:00 2001 From: Simon R Date: Fri, 15 Jan 2016 08:11:05 +0100 Subject: [PATCH 05/22] Undo Commit --- nipype/pipeline/plugins/oar.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/nipype/pipeline/plugins/oar.py b/nipype/pipeline/plugins/oar.py index 07ceb8e417..fab0f2513a 100644 --- a/nipype/pipeline/plugins/oar.py +++ b/nipype/pipeline/plugins/oar.py @@ -37,8 +37,6 @@ def __init__(self, **kwargs): self._max_tries = 2 self._max_jobname_length = 15 if 'plugin_args' in kwargs and kwargs['plugin_args']: - if 'oarsub_args' in kwargs['plugin_args']: - self._oarsub_args = kwargs['plugin_args']['oarsub_args'] if 'retry_timeout' in kwargs['plugin_args']: self._retry_timeout = kwargs['plugin_args']['retry_timeout'] if 'max_tries' in kwargs['plugin_args']: From d15d89cde74eda3826267ebc85eae82952305d4b Mon Sep 17 00:00:00 2001 From: Alexandre Manhaes Savio Date: Thu, 28 Jan 2016 13:46:24 +0100 Subject: [PATCH 06/22] enh: add PETPVC wrapper --- nipype/interfaces/petpvc.py | 235 ++++++++++++++++++++++++++++++++++++ 1 file changed, 235 insertions(+) create mode 100644 nipype/interfaces/petpvc.py diff --git a/nipype/interfaces/petpvc.py b/nipype/interfaces/petpvc.py new file mode 100644 index 0000000000..519282491a --- /dev/null +++ b/nipype/interfaces/petpvc.py @@ -0,0 +1,235 @@ +# -*- coding: utf-8 -*- +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +""" +Nipype interface for PETPVC. + +PETPVC is a software from the Nuclear Medicine Department +of the UCL University Hospital, London, UK. + +Its source code is here: https://github.com/UCL/PETPVC + +The methods that it implement are explained here: +K. Erlandsson, I. Buvat, P. H. Pretorius, B. A. Thomas, and B. F. Hutton, +“A review of partial volume correction techniques for emission tomography +and their applications in neurology, cardiology and oncology,” Phys. Med. +Biol., vol. 57, no. 21, p. R119, 2012. + +There is a publication waiting to be accepted for this software tool. + + +Its command line help shows this: + + -i --input < filename > + = PET image file + -o --output < filename > + = Output file + [ -m --mask < filename > ] + = Mask image file + -p --pvc < keyword > + = Desired PVC method + -x < X > + = The full-width at half maximum in mm along x-axis + -y < Y > + = The full-width at half maximum in mm along y-axis + -z < Z > + = The full-width at half maximum in mm along z-axis + [ -d --debug ] + = Prints debug information + [ -n --iter [ Val ] ] + = Number of iterations + With: Val (Default = 10) + [ -k [ Val ] ] + = Number of deconvolution iterations + With: Val (Default = 10) + [ -a --alpha [ aval ] ] + = Alpha value + With: aval (Default = 1.5) + [ -s --stop [ stopval ] ] + = Stopping criterion + With: stopval (Default = 0.01) + +---------------------------------------------- +Technique - keyword + +Geometric transfer matrix - "GTM" +Labbe approach - "LABBE" +Richardson-Lucy - "RL" +Van-Cittert - "VC" +Region-based voxel-wise correction - "RBV" +RBV with Labbe - "LABBE+RBV" +RBV with Van-Cittert - "RBV+VC" +RBV with Richardson-Lucy - "RBV+RL" +RBV with Labbe and Van-Cittert - "LABBE+RBV+VC" +RBV with Labbe and Richardson-Lucy- "LABBE+RBV+RL" +Multi-target correction - "MTC" +MTC with Labbe - "LABBE+MTC" +MTC with Van-Cittert - "MTC+VC" +MTC with Richardson-Lucy - "MTC+RL" +MTC with Labbe and Van-Cittert - "LABBE+MTC+VC" +MTC with Labbe and Richardson-Lucy- "LABBE+MTC+RL" +Iterative Yang - "IY" +Iterative Yang with Van-Cittert - "IY+VC" +Iterative Yang with Richardson-Lucy - "IY+RL" +Muller Gartner - "MG" +Muller Gartner with Van-Cittert - "MG+VC" +Muller Gartner with Richardson-Lucy - "MG+RL" + +""" +from __future__ import print_function +from __future__ import division + +import os +import warnings + +from nipype.interfaces.base import ( + TraitedSpec, + CommandLineInputSpec, + CommandLine, + File, + isdefined, + traits, +) + +warn = warnings.warn + +pvc_methods = ['GTM', + 'IY', + 'IY+RL', + 'IY+VC', + 'LABBE', + 'LABBE+MTC', + 'LABBE+MTC+RL', + 'LABBE+MTC+VC', + 'LABBE+RBV', + 'LABBE+RBV+RL', + 'LABBE+RBV+VC', + 'MG', + 'MG+RL', + 'MG+VC', + 'MTC', + 'MTC+RL', + 'MTC+VC', + 'RBV', + 'RBV+RL', + 'RBV+VC', + 'RL', + 'VC'] + + +class PETPVCInputSpec(CommandLineInputSpec): + in_file = File(desc="PET image file", exists=True, mandatory=True, argstr="-i %s") + out_file = File(desc="Output file", genfile=True, hash_files=False, argstr="-o %s") + mask_file = File(desc="Mask image file", exists=True, mandatory=True, argstr="-m %s") + pvc = traits.Enum(pvc_methods, desc="Desired PVC method", mandatory=True, argstr="-p %s") + fwhm_x = traits.Float(desc="The full-width at half maximum in mm along x-axis", mandatory=True, argstr="-x %.4f") + fwhm_y = traits.Float(desc="The full-width at half maximum in mm along y-axis", mandatory=True, argstr="-y %.4f") + fwhm_z = traits.Float(desc="The full-width at half maximum in mm along z-axis", mandatory=True, argstr="-z %.4f") + debug = traits.Bool (desc="Prints debug information", usedefault=True, default_value=False, argstr="-d") + n_iter = traits.Int (desc="Number of iterations", default_value=10, argstr="-n %d") + n_deconv = traits.Int (desc="Number of deconvolution iterations", default_value=10, argstr="-k %d") + alpha = traits.Float(desc="Alpha value", default_value=1.5, argstr="-a %.4f") + stop_crit = traits.Float(desc="Stopping criterion", default_value=0.01, argstr="-a %.4f") + + +class PETPVCOutputSpec(TraitedSpec): + out_file = File(desc = "Output file") + + +class PETPVC(CommandLine): + """ Use PETPVC for partial volume correction of PET images. + + Examples + -------- + >>> from ..testing import example_data + >>> #TODO get data for PETPVC + >>> pvc = PETPVC() + >>> pvc.inputs.in_file = example_data('pet.nii.gz') + >>> pvc.inputs.mask_file = example_data('tissues.nii.gz') + >>> pvc.inputs.out_file = 'pet_pvc_rbv.nii.gz' + >>> pvc.inputs.pvc = 'RBV' + >>> pvc.inputs.fwhm_x = 2.0 + >>> pvc.inputs.fwhm_y = 2.0 + >>> pvc.inputs.fwhm_z = 2.0 + >>> outs = pvc.run() #doctest: +SKIP + """ + input_spec = PETPVCInputSpec + output_spec = PETPVCOutputSpec + _cmd = 'petpvc' + + def _list_outputs(self): + outputs = self.output_spec().get() + outputs['out_file'] = self.inputs.out_file + if not isdefined(outputs['out_file']): + method_name = self.inputs.pvc.lower() + outputs['out_file'] = self._gen_fname(self.inputs.in_file, + suffix='_{}_pvc'.format(method_name)) + + outputs['out_file'] = os.path.abspath(outputs['out_file']) + return outputs + + def _gen_fname(self, basename, cwd=None, suffix=None, change_ext=True, + ext='.nii.gz'): + """Generate a filename based on the given parameters. + + The filename will take the form: cwd/basename. + If change_ext is True, it will use the extentions specified in + intputs.output_type. + + Parameters + ---------- + basename : str + Filename to base the new filename on. + cwd : str + Path to prefix to the new filename. (default is os.getcwd()) + suffix : str + Suffix to add to the `basename`. (defaults is '' ) + change_ext : bool + Flag to change the filename extension to the given `ext`. + (Default is False) + + Returns + ------- + fname : str + New filename based on given parameters. + + """ + from nipype.utils.filemanip import fname_presuffix + + if basename == '': + msg = 'Unable to generate filename for command %s. ' % self.cmd + msg += 'basename is not set!' + raise ValueError(msg) + if cwd is None: + cwd = os.getcwd() + if change_ext: + if suffix: + suffix = ''.join((suffix, ext)) + else: + suffix = ext + if suffix is None: + suffix = '' + fname = fname_presuffix(basename, suffix=suffix, + use_ext=False, newpath=cwd) + return fname + + def _gen_filename(self, name): + if name == 'out_file': + return self._list_outputs()['out_file'] + return None + + +if __name__ == '__main__': + + #from .testing import example_data + #TODO get data for PETPVC + + pvc = PETPVC() + pvc.inputs.in_file = example_data('pet.nii.gz') + pvc.inputs.mask_file = example_data('tissues.nii.gz') + pvc.inputs.out_file = 'pet_pvc_rbv.nii.gz' + pvc.inputs.pvc = 'RBV' + pvc.inputs.fwhm_x = 2.0 + pvc.inputs.fwhm_y = 2.0 + pvc.inputs.fwhm_z = 2.0 + pvc.run() From 2ed1624c791c77b00b246062e446b1880fd9a3e8 Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Thu, 28 Jan 2016 08:34:39 -0800 Subject: [PATCH 07/22] improved AFNI version parsing after publication of 16.0.01, #1328 --- nipype/interfaces/afni/base.py | 30 ++++++++++++++++------------ nipype/interfaces/afni/preprocess.py | 4 +--- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/nipype/interfaces/afni/base.py b/nipype/interfaces/afni/base.py index 22870d5d19..0f953c82e4 100644 --- a/nipype/interfaces/afni/base.py +++ b/nipype/interfaces/afni/base.py @@ -6,13 +6,14 @@ import os -import warnings +from ... import logging from ...utils.filemanip import split_filename from ..base import ( CommandLine, traits, CommandLineInputSpec, isdefined, File, TraitedSpec) -warn = warnings.warn +# Use nipype's logging system +iflogger = logging.getLogger('interface') class Info(object): @@ -40,26 +41,29 @@ def version(): try: clout = CommandLine(command='afni_vcheck', terminal_output='allatonce').run() + + # Try to parse the version number + currv = clout.runtime.stdout.split('\n')[1].split('=', 1)[1].strip() except IOError: # If afni_vcheck is not present, return None - warn('afni_vcheck executable not found.') + iflogger.warn('afni_vcheck executable not found.') return None except RuntimeError as e: - # If AFNI is outdated, afni_vcheck throws error - warn('AFNI is outdated') - return str(e).split('\n')[4].split('=', 1)[1].strip() - - # Try to parse the version number - out = clout.runtime.stdout.split('\n')[1].split('=', 1)[1].strip() + # If AFNI is outdated, afni_vcheck throws error. + # Show new version, but parse current anyways. + currv = str(e).split('\n')[4].split('=', 1)[1].strip() + nextv = str(e).split('\n')[6].split('=', 1)[1].strip() + iflogger.warn( + 'AFNI is outdated, detected version %s and %s is available.' % (currv, nextv)) - if out.startswith('AFNI_'): - out = out[5:] + if currv.startswith('AFNI_'): + currv = currv[5:] - v = out.split('.') + v = currv.split('.') try: v = [int(n) for n in v] except ValueError: - return out + return currv return tuple(v) @classmethod diff --git a/nipype/interfaces/afni/preprocess.py b/nipype/interfaces/afni/preprocess.py index 058a99a1af..85f2a4eaf9 100644 --- a/nipype/interfaces/afni/preprocess.py +++ b/nipype/interfaces/afni/preprocess.py @@ -8,10 +8,10 @@ >>> datadir = os.path.realpath(os.path.join(filepath, '../../testing/data')) >>> os.chdir(datadir) """ -import warnings import os import re +from warnings import warn from .base import AFNICommand, AFNICommandInputSpec, AFNICommandOutputSpec from ..base import CommandLineInputSpec, CommandLine, OutputMultiPath @@ -20,8 +20,6 @@ from ...utils.filemanip import (load_json, save_json, split_filename) from ...utils.filemanip import fname_presuffix -warn = warnings.warn - class To3DInputSpec(AFNICommandInputSpec): out_file = File(name_template="%s", desc='output image file name', From 460b3a43af8e7b2eaeb7c406f11041f93fb8c751 Mon Sep 17 00:00:00 2001 From: Alexandre Manhaes Savio Date: Thu, 28 Jan 2016 17:49:41 +0100 Subject: [PATCH 08/22] Add empty petpvc example files --- nipype/interfaces/petpvc.py | 25 ++++++------------------- nipype/testing/data/pet.nii.gz | 0 nipype/testing/data/tissues.nii.gz | 0 3 files changed, 6 insertions(+), 19 deletions(-) create mode 100644 nipype/testing/data/pet.nii.gz create mode 100644 nipype/testing/data/tissues.nii.gz diff --git a/nipype/interfaces/petpvc.py b/nipype/interfaces/petpvc.py index 519282491a..898624a48d 100644 --- a/nipype/interfaces/petpvc.py +++ b/nipype/interfaces/petpvc.py @@ -1,7 +1,10 @@ -# -*- coding: utf-8 -*- -# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- -# vi: set ft=python sts=4 ts=4 sw=4 et: """ + Change directory to provide relative paths for doctests + >>> import os + >>> filepath = os.path.dirname( os.path.realpath( __file__ ) ) + >>> datadir = os.path.realpath(os.path.join(filepath, '../testing/data')) + >>> os.chdir(datadir) + Nipype interface for PETPVC. PETPVC is a software from the Nuclear Medicine Department @@ -217,19 +220,3 @@ def _gen_filename(self, name): if name == 'out_file': return self._list_outputs()['out_file'] return None - - -if __name__ == '__main__': - - #from .testing import example_data - #TODO get data for PETPVC - - pvc = PETPVC() - pvc.inputs.in_file = example_data('pet.nii.gz') - pvc.inputs.mask_file = example_data('tissues.nii.gz') - pvc.inputs.out_file = 'pet_pvc_rbv.nii.gz' - pvc.inputs.pvc = 'RBV' - pvc.inputs.fwhm_x = 2.0 - pvc.inputs.fwhm_y = 2.0 - pvc.inputs.fwhm_z = 2.0 - pvc.run() diff --git a/nipype/testing/data/pet.nii.gz b/nipype/testing/data/pet.nii.gz new file mode 100644 index 0000000000..e69de29bb2 diff --git a/nipype/testing/data/tissues.nii.gz b/nipype/testing/data/tissues.nii.gz new file mode 100644 index 0000000000..e69de29bb2 From 995ecaf5680db07a7315bcf18c4b20ad8aa0d4c6 Mon Sep 17 00:00:00 2001 From: Alexandre Manhaes Savio Date: Thu, 28 Jan 2016 17:51:16 +0100 Subject: [PATCH 09/22] fix: drop example_data from petpvc --- nipype/interfaces/petpvc.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nipype/interfaces/petpvc.py b/nipype/interfaces/petpvc.py index 898624a48d..bd79f32c8e 100644 --- a/nipype/interfaces/petpvc.py +++ b/nipype/interfaces/petpvc.py @@ -147,8 +147,8 @@ class PETPVC(CommandLine): >>> from ..testing import example_data >>> #TODO get data for PETPVC >>> pvc = PETPVC() - >>> pvc.inputs.in_file = example_data('pet.nii.gz') - >>> pvc.inputs.mask_file = example_data('tissues.nii.gz') + >>> pvc.inputs.in_file = 'pet.nii.gz' + >>> pvc.inputs.mask_file = 'tissues.nii.gz' >>> pvc.inputs.out_file = 'pet_pvc_rbv.nii.gz' >>> pvc.inputs.pvc = 'RBV' >>> pvc.inputs.fwhm_x = 2.0 From 74f8b78cde7a610d70d714c523d0cf0faadd38cb Mon Sep 17 00:00:00 2001 From: Chris Gorgolewski Date: Sun, 31 Jan 2016 09:58:18 -0800 Subject: [PATCH 10/22] Fixed bad conflict resolution --- nipype/pipeline/engine/nodes.py | 1 - 1 file changed, 1 deletion(-) diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index d15f5b6a59..c128fadd89 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -72,7 +72,6 @@ from .base import EngineBase -<<<<<<< HEAD:nipype/pipeline/engine.py def _write_inputs(node): lines = [] nodename = node.fullname.replace('.', '_') From 463708c185cd68cb2db4f075a12d720611731328 Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Sun, 31 Jan 2016 15:58:40 -0800 Subject: [PATCH 11/22] fixed several errors --- nipype/interfaces/dipy/preprocess.py | 4 +--- nipype/pipeline/engine/nodes.py | 14 +++++++++----- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/nipype/interfaces/dipy/preprocess.py b/nipype/interfaces/dipy/preprocess.py index 885a97a93a..5ecc49b957 100644 --- a/nipype/interfaces/dipy/preprocess.py +++ b/nipype/interfaces/dipy/preprocess.py @@ -23,9 +23,6 @@ package_check('dipy', version='0.6.0') except Exception as e: have_dipy = False -else: - from dipy.align.aniso2iso import resample - from dipy.core.gradients import GradientTable class ResampleInputSpec(TraitedSpec): @@ -172,6 +169,7 @@ def resample_proxy(in_file, order=3, new_zooms=None, out_file=None): """ Performs regridding of an image to set isotropic voxel sizes using dipy. """ + from dipy.align.aniso2iso import resample if out_file is None: fname, fext = op.splitext(op.basename(in_file)) diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index c128fadd89..83c1a8acd3 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -28,7 +28,11 @@ from ordereddict import OrderedDict from copy import deepcopy -import pickle +try: + import cPickle as pickle +except: + import pickle + from glob import glob import gzip import inspect @@ -81,12 +85,12 @@ def _write_inputs(node): if type(val) == str: try: func = create_function_from_source(val) - except RuntimeError, e: + except RuntimeError: lines.append("%s.inputs.%s = '%s'" % (nodename, key, val)) else: funcname = [name for name in func.func_globals if name != '__builtins__'][0] - lines.append(cPickle.loads(val)) + lines.append(pickle.loads(val)) if funcname == nodename: lines[-1] = lines[-1].replace(' %s(' % funcname, ' %s_1(' % funcname) @@ -207,7 +211,7 @@ def _check_inputs(self, parameter): return hasattr(self.inputs, parameter) def _verify_name(self, name): - valid_name = bool(re.match('^[\w-]+$', name)) + valid_name = bool(re.match(r'^[\w-]+$', name)) if not valid_name: raise Exception('the name must not contain any special characters') @@ -650,7 +654,7 @@ def export(self, filename=None, prefix="output", format="python", lines.append(connect_template2 % line_args) functionlines = ['# Functions'] for function in functions: - functionlines.append(cPickle.loads(function).rstrip()) + functionlines.append(pickle.loads(function).rstrip()) all_lines = importlines + functionlines + lines if not filename: From 6be212e5f1aa4889b89f029bfb454b9e6addfb72 Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Sun, 31 Jan 2016 15:59:11 -0800 Subject: [PATCH 12/22] fixed several errors --- nipype/pipeline/engine/nodes.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index 83c1a8acd3..d0c75081b1 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -376,7 +376,7 @@ def connect(self, *args, **kwargs): # handles the case that source is specified # with a function sourcename = source[0] - elif isinstance(source, six.string_types): + elif isinstance(source, string_types): sourcename = source else: raise Exception(('Unknown source specification in ' @@ -397,7 +397,7 @@ def connect(self, *args, **kwargs): # turn functions into strings for srcnode, destnode, connects in connection_list: for idx, (src, dest) in enumerate(connects): - if isinstance(src, tuple) and not isinstance(src[1], six.string_types): + if isinstance(src, tuple) and not isinstance(src[1], string_types): function_source = getsource(src[1]) connects[idx] = ((src[0], function_source, src[2:]), dest) @@ -918,7 +918,7 @@ def _set_input(self, object, name, newvalue): def _set_node_input(self, node, param, source, sourceinfo): """Set inputs of a node given the edge connection""" - if isinstance(sourceinfo, six.string_types): + if isinstance(sourceinfo, string_types): val = source.get_output(sourceinfo) elif isinstance(sourceinfo, tuple): if callable(sourceinfo[1]): From b73ec6d1495b9e0633a39f5a550f372b4dad5418 Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Sun, 31 Jan 2016 16:36:27 -0800 Subject: [PATCH 13/22] make check-before-commit --- nipype/algorithms/tests/test_auto_ErrorMap.py | 35 - nipype/algorithms/tests/test_auto_Overlap.py | 47 - .../tests/test_auto_Surface2VolTransform.py | 2 + nipype/pipeline/engine/nodes.py | 1055 +---------------- 4 files changed, 3 insertions(+), 1136 deletions(-) delete mode 100644 nipype/algorithms/tests/test_auto_ErrorMap.py delete mode 100644 nipype/algorithms/tests/test_auto_Overlap.py diff --git a/nipype/algorithms/tests/test_auto_ErrorMap.py b/nipype/algorithms/tests/test_auto_ErrorMap.py deleted file mode 100644 index 69484529dd..0000000000 --- a/nipype/algorithms/tests/test_auto_ErrorMap.py +++ /dev/null @@ -1,35 +0,0 @@ -# AUTO-GENERATED by tools/checkspecs.py - DO NOT EDIT -from ...testing import assert_equal -from ..metrics import ErrorMap - - -def test_ErrorMap_inputs(): - input_map = dict(ignore_exception=dict(nohash=True, - usedefault=True, - ), - in_ref=dict(mandatory=True, - ), - in_tst=dict(mandatory=True, - ), - mask=dict(), - metric=dict(mandatory=True, - usedefault=True, - ), - out_map=dict(), - ) - inputs = ErrorMap.input_spec() - - for key, metadata in list(input_map.items()): - for metakey, value in list(metadata.items()): - yield assert_equal, getattr(inputs.traits()[key], metakey), value - - -def test_ErrorMap_outputs(): - output_map = dict(distance=dict(), - out_map=dict(), - ) - outputs = ErrorMap.output_spec() - - for key, metadata in list(output_map.items()): - for metakey, value in list(metadata.items()): - yield assert_equal, getattr(outputs.traits()[key], metakey), value diff --git a/nipype/algorithms/tests/test_auto_Overlap.py b/nipype/algorithms/tests/test_auto_Overlap.py deleted file mode 100644 index a5a3874bd1..0000000000 --- a/nipype/algorithms/tests/test_auto_Overlap.py +++ /dev/null @@ -1,47 +0,0 @@ -# AUTO-GENERATED by tools/checkspecs.py - DO NOT EDIT -from ...testing import assert_equal -from ..misc import Overlap - - -def test_Overlap_inputs(): - input_map = dict(bg_overlap=dict(mandatory=True, - usedefault=True, - ), - ignore_exception=dict(nohash=True, - usedefault=True, - ), - mask_volume=dict(), - out_file=dict(usedefault=True, - ), - vol_units=dict(mandatory=True, - usedefault=True, - ), - volume1=dict(mandatory=True, - ), - volume2=dict(mandatory=True, - ), - weighting=dict(usedefault=True, - ), - ) - inputs = Overlap.input_spec() - - for key, metadata in list(input_map.items()): - for metakey, value in list(metadata.items()): - yield assert_equal, getattr(inputs.traits()[key], metakey), value - - -def test_Overlap_outputs(): - output_map = dict(dice=dict(), - diff_file=dict(), - jaccard=dict(), - labels=dict(), - roi_di=dict(), - roi_ji=dict(), - roi_voldiff=dict(), - volume_difference=dict(), - ) - outputs = Overlap.output_spec() - - for key, metadata in list(output_map.items()): - for metakey, value in list(metadata.items()): - yield assert_equal, getattr(outputs.traits()[key], metakey), value diff --git a/nipype/interfaces/freesurfer/tests/test_auto_Surface2VolTransform.py b/nipype/interfaces/freesurfer/tests/test_auto_Surface2VolTransform.py index 42689a458b..7da293e7bd 100644 --- a/nipype/interfaces/freesurfer/tests/test_auto_Surface2VolTransform.py +++ b/nipype/interfaces/freesurfer/tests/test_auto_Surface2VolTransform.py @@ -16,6 +16,7 @@ def test_Surface2VolTransform_inputs(): usedefault=True, ), mkmask=dict(argstr='--mkmask', + xor=['source_file'], ), projfrac=dict(argstr='--projfrac %s', ), @@ -26,6 +27,7 @@ def test_Surface2VolTransform_inputs(): source_file=dict(argstr='--surfval %s', copyfile=False, mandatory=True, + xor=['mkmask'], ), subject_id=dict(argstr='--identity %s', xor=['reg_file'], diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index d0c75081b1..9f9165e3b2 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -28,11 +28,7 @@ from ordereddict import OrderedDict from copy import deepcopy -try: - import cPickle as pickle -except: - import pickle - +import pickle from glob import glob import gzip import inspect @@ -76,1055 +72,6 @@ from .base import EngineBase -def _write_inputs(node): - lines = [] - nodename = node.fullname.replace('.', '_') - for key, _ in node.inputs.items(): - val = getattr(node.inputs, key) - if isdefined(val): - if type(val) == str: - try: - func = create_function_from_source(val) - except RuntimeError: - lines.append("%s.inputs.%s = '%s'" % (nodename, key, val)) - else: - funcname = [name for name in func.func_globals - if name != '__builtins__'][0] - lines.append(pickle.loads(val)) - if funcname == nodename: - lines[-1] = lines[-1].replace(' %s(' % funcname, - ' %s_1(' % funcname) - funcname = '%s_1' % funcname - lines.append('from nipype.utils.misc import getsource') - lines.append("%s.inputs.%s = getsource(%s)" % (nodename, - key, - funcname)) - else: - lines.append('%s.inputs.%s = %s' % (nodename, key, val)) - return lines - - -def format_node(node, format='python', include_config=False): - """Format a node in a given output syntax.""" - lines = [] - name = node.fullname.replace('.', '_') - if format == 'python': - klass = node._interface - importline = 'from %s import %s' % (klass.__module__, - klass.__class__.__name__) - comment = '# Node: %s' % node.fullname - spec = inspect.getargspec(node._interface.__init__) - args = spec.args[1:] - if args: - filled_args = [] - for arg in args: - if hasattr(node._interface, '_%s' % arg): - filled_args.append('%s=%s' % (arg, getattr(node._interface, - '_%s' % arg))) - args = ', '.join(filled_args) - else: - args = '' - klass_name = klass.__class__.__name__ - if isinstance(node, MapNode): - nodedef = '%s = MapNode(%s(%s), iterfield=%s, name="%s")' \ - % (name, klass_name, args, node.iterfield, name) - else: - nodedef = '%s = Node(%s(%s), name="%s")' \ - % (name, klass_name, args, name) - lines = [importline, comment, nodedef] - - if include_config: - lines = [importline, "from collections import OrderedDict", - comment, nodedef] - lines.append('%s.config = %s' % (name, node.config)) - - if node.iterables is not None: - lines.append('%s.iterables = %s' % (name, node.iterables)) - lines.extend(_write_inputs(node)) - - return lines - - -class WorkflowBase(object): - """Defines common attributes and functions for workflows and nodes.""" - - def __init__(self, name=None, base_dir=None): - """ Initialize base parameters of a workflow or node - - Parameters - ---------- - name : string (mandatory) - Name of this node. Name must be alphanumeric and not contain any - special characters (e.g., '.', '@'). - base_dir : string - base output directory (will be hashed before creations) - default=None, which results in the use of mkdtemp - - """ - self.base_dir = base_dir - self.config = None - self._verify_name(name) - self.name = name - # for compatibility with node expansion using iterables - self._id = self.name - self._hierarchy = None - - @property - def inputs(self): - raise NotImplementedError - - @property - def outputs(self): - raise NotImplementedError - - @property - def fullname(self): - fullname = self.name - if self._hierarchy: - fullname = self._hierarchy + '.' + self.name - return fullname - - def clone(self, name): - """Clone a workflowbase object - - Parameters - ---------- - - name : string (mandatory) - A clone of node or workflow must have a new name - """ - if (name is None) or (name == self.name): - raise Exception('Cloning requires a new name') - self._verify_name(name) - clone = deepcopy(self) - clone.name = name - clone._id = name - clone._hierarchy = None - return clone - - def _check_outputs(self, parameter): - return hasattr(self.outputs, parameter) - - def _check_inputs(self, parameter): - if isinstance(self.inputs, DynamicTraitedSpec): - return True - return hasattr(self.inputs, parameter) - - def _verify_name(self, name): - valid_name = bool(re.match(r'^[\w-]+$', name)) - if not valid_name: - raise Exception('the name must not contain any special characters') - - def __repr__(self): - if self._hierarchy: - return '.'.join((self._hierarchy, self._id)) - else: - return self._id - - def save(self, filename=None): - if filename is None: - filename = 'temp.pklz' - savepkl(filename, self) - - def load(self, filename): - if '.npz' in filename: - DeprecationWarning(('npz files will be deprecated in the next ' - 'release. you can use numpy to open them.')) - return np.load(filename) - return loadpkl(filename) - - -class Workflow(WorkflowBase): - """Controls the setup and execution of a pipeline of processes.""" - - def __init__(self, name, base_dir=None): - """Create a workflow object. - - Parameters - ---------- - name : alphanumeric string - unique identifier for the workflow - base_dir : string, optional - path to workflow storage - - """ - super(Workflow, self).__init__(name, base_dir) - self._graph = nx.DiGraph() - self.config = deepcopy(config._sections) - - # PUBLIC API - def clone(self, name): - """Clone a workflow - - .. note:: - - Will reset attributes used for executing workflow. See - _init_runtime_fields. - - Parameters - ---------- - - name: alphanumeric name - unique name for the workflow - - """ - clone = super(Workflow, self).clone(name) - clone._reset_hierarchy() - return clone - - # Graph creation functions - def connect(self, *args, **kwargs): - """Connect nodes in the pipeline. - - This routine also checks if inputs and outputs are actually provided by - the nodes that are being connected. - - Creates edges in the directed graph using the nodes and edges specified - in the `connection_list`. Uses the NetworkX method - DiGraph.add_edges_from. - - Parameters - ---------- - - args : list or a set of four positional arguments - - Four positional arguments of the form:: - - connect(source, sourceoutput, dest, destinput) - - source : nodewrapper node - sourceoutput : string (must be in source.outputs) - dest : nodewrapper node - destinput : string (must be in dest.inputs) - - A list of 3-tuples of the following form:: - - [(source, target, - [('sourceoutput/attribute', 'targetinput'), - ...]), - ...] - - Or:: - - [(source, target, [(('sourceoutput1', func, arg2, ...), - 'targetinput'), ...]), - ...] - sourceoutput1 will always be the first argument to func - and func will be evaluated and the results sent ot targetinput - - currently func needs to define all its needed imports within the - function as we use the inspect module to get at the source code - and execute it remotely - """ - if len(args) == 1: - connection_list = args[0] - elif len(args) == 4: - connection_list = [(args[0], args[2], [(args[1], args[3])])] - else: - raise Exception('unknown set of parameters to connect function') - if not kwargs: - disconnect = False - else: - disconnect = kwargs['disconnect'] - newnodes = [] - for srcnode, destnode, _ in connection_list: - if self in [srcnode, destnode]: - msg = ('Workflow connect cannot contain itself as node:' - ' src[%s] dest[%s] workflow[%s]') % (srcnode, - destnode, - self.name) - - raise IOError(msg) - if (srcnode not in newnodes) and not self._has_node(srcnode): - newnodes.append(srcnode) - if (destnode not in newnodes) and not self._has_node(destnode): - newnodes.append(destnode) - if newnodes: - self._check_nodes(newnodes) - for node in newnodes: - if node._hierarchy is None: - node._hierarchy = self.name - not_found = [] - connected_ports = {} - for srcnode, destnode, connects in connection_list: - if destnode not in connected_ports: - connected_ports[destnode] = [] - # check to see which ports of destnode are already - # connected. - if not disconnect and (destnode in self._graph.nodes()): - for edge in self._graph.in_edges_iter(destnode): - data = self._graph.get_edge_data(*edge) - for sourceinfo, destname in data['connect']: - if destname not in connected_ports[destnode]: - connected_ports[destnode] += [destname] - for source, dest in connects: - # Currently datasource/sink/grabber.io modules - # determine their inputs/outputs depending on - # connection settings. Skip these modules in the check - if dest in connected_ports[destnode]: - raise Exception(""" -Trying to connect %s:%s to %s:%s but input '%s' of node '%s' is already -connected. -""" % (srcnode, source, destnode, dest, dest, destnode)) - if not (hasattr(destnode, '_interface') and - '.io' in str(destnode._interface.__class__)): - if not destnode._check_inputs(dest): - not_found.append(['in', destnode.name, dest]) - if not (hasattr(srcnode, '_interface') and - '.io' in str(srcnode._interface.__class__)): - if isinstance(source, tuple): - # handles the case that source is specified - # with a function - sourcename = source[0] - elif isinstance(source, string_types): - sourcename = source - else: - raise Exception(('Unknown source specification in ' - 'connection from output of %s') % - srcnode.name) - if sourcename and not srcnode._check_outputs(sourcename): - not_found.append(['out', srcnode.name, sourcename]) - connected_ports[destnode] += [dest] - infostr = [] - for info in not_found: - infostr += ["Module %s has no %sput called %s\n" % (info[1], - info[0], - info[2])] - if not_found: - raise Exception('\n'.join(['Some connections were not found'] + - infostr)) - - # turn functions into strings - for srcnode, destnode, connects in connection_list: - for idx, (src, dest) in enumerate(connects): - if isinstance(src, tuple) and not isinstance(src[1], string_types): - function_source = getsource(src[1]) - connects[idx] = ((src[0], function_source, src[2:]), dest) - - # add connections - for srcnode, destnode, connects in connection_list: - edge_data = self._graph.get_edge_data(srcnode, destnode, None) - if edge_data: - logger.debug('(%s, %s): Edge data exists: %s' - % (srcnode, destnode, str(edge_data))) - for data in connects: - if data not in edge_data['connect']: - edge_data['connect'].append(data) - if disconnect: - logger.debug('Removing connection: %s' % str(data)) - edge_data['connect'].remove(data) - if edge_data['connect']: - self._graph.add_edges_from([(srcnode, - destnode, - edge_data)]) - else: - #pass - logger.debug('Removing connection: %s->%s' % (srcnode, - destnode)) - self._graph.remove_edges_from([(srcnode, destnode)]) - elif not disconnect: - logger.debug('(%s, %s): No edge data' % (srcnode, destnode)) - self._graph.add_edges_from([(srcnode, destnode, - {'connect': connects})]) - edge_data = self._graph.get_edge_data(srcnode, destnode) - logger.debug('(%s, %s): new edge data: %s' % (srcnode, destnode, - str(edge_data))) - - def disconnect(self, *args): - """Disconnect two nodes - - See the docstring for connect for format. - """ - # yoh: explicit **dict was introduced for compatibility with Python 2.5 - return self.connect(*args, **dict(disconnect=True)) - - def add_nodes(self, nodes): - """ Add nodes to a workflow - - Parameters - ---------- - nodes : list - A list of WorkflowBase-based objects - """ - newnodes = [] - all_nodes = self._get_all_nodes() - for node in nodes: - if self._has_node(node): - raise IOError('Node %s already exists in the workflow' % node) - if isinstance(node, Workflow): - for subnode in node._get_all_nodes(): - if subnode in all_nodes: - raise IOError(('Subnode %s of node %s already exists ' - 'in the workflow') % (subnode, node)) - newnodes.append(node) - if not newnodes: - logger.debug('no new nodes to add') - return - for node in newnodes: - if not issubclass(node.__class__, WorkflowBase): - raise Exception('Node %s must be a subclass of WorkflowBase' % - str(node)) - self._check_nodes(newnodes) - for node in newnodes: - if node._hierarchy is None: - node._hierarchy = self.name - self._graph.add_nodes_from(newnodes) - - def remove_nodes(self, nodes): - """ Remove nodes from a workflow - - Parameters - ---------- - nodes : list - A list of WorkflowBase-based objects - """ - self._graph.remove_nodes_from(nodes) - - # Input-Output access - @property - def inputs(self): - return self._get_inputs() - - @property - def outputs(self): - return self._get_outputs() - - def get_node(self, name): - """Return an internal node by name - """ - nodenames = name.split('.') - nodename = nodenames[0] - outnode = [node for node in self._graph.nodes() if - str(node).endswith('.' + nodename)] - if outnode: - outnode = outnode[0] - if nodenames[1:] and issubclass(outnode.__class__, Workflow): - outnode = outnode.get_node('.'.join(nodenames[1:])) - else: - outnode = None - return outnode - - def list_node_names(self): - """List names of all nodes in a workflow - """ - outlist = [] - for node in nx.topological_sort(self._graph): - if isinstance(node, Workflow): - outlist.extend(['.'.join((node.name, nodename)) for nodename in - node.list_node_names()]) - else: - outlist.append(node.name) - return sorted(outlist) - - def write_graph(self, dotfilename='graph.dot', graph2use='hierarchical', - format="png", simple_form=True): - """Generates a graphviz dot file and a png file - - Parameters - ---------- - - graph2use: 'orig', 'hierarchical' (default), 'flat', 'exec', 'colored' - orig - creates a top level graph without expanding internal - workflow nodes; - flat - expands workflow nodes recursively; - hierarchical - expands workflow nodes recursively with a - notion on hierarchy; - colored - expands workflow nodes recursively with a - notion on hierarchy in color; - exec - expands workflows to depict iterables - - format: 'png', 'svg' - - simple_form: boolean (default: True) - Determines if the node name used in the graph should be of the form - 'nodename (package)' when True or 'nodename.Class.package' when - False. - - """ - graphtypes = ['orig', 'flat', 'hierarchical', 'exec', 'colored'] - if graph2use not in graphtypes: - raise ValueError('Unknown graph2use keyword. Must be one of: ' + - str(graphtypes)) - base_dir, dotfilename = op.split(dotfilename) - if base_dir == '': - if self.base_dir: - base_dir = self.base_dir - if self.name: - base_dir = op.join(base_dir, self.name) - else: - base_dir = os.getcwd() - base_dir = make_output_dir(base_dir) - if graph2use in ['hierarchical', 'colored']: - dotfilename = op.join(base_dir, dotfilename) - self.write_hierarchical_dotfile(dotfilename=dotfilename, - colored=graph2use == "colored", - simple_form=simple_form) - format_dot(dotfilename, format=format) - else: - graph = self._graph - if graph2use in ['flat', 'exec']: - graph = self._create_flat_graph() - if graph2use == 'exec': - graph = generate_expanded_graph(deepcopy(graph)) - export_graph(graph, base_dir, dotfilename=dotfilename, - format=format, simple_form=simple_form) - - def write_hierarchical_dotfile(self, dotfilename=None, colored=False, - simple_form=True): - dotlist = ['digraph %s{' % self.name] - dotlist.append(self._get_dot(prefix=' ', colored=colored, - simple_form=simple_form)) - dotlist.append('}') - dotstr = '\n'.join(dotlist) - if dotfilename: - fp = open(dotfilename, 'wt') - fp.writelines(dotstr) - fp.close() - else: - logger.info(dotstr) - - def export(self, filename=None, prefix="output", format="python", - include_config=False): - """Export object into a different format - - Parameters - ---------- - filename: string - file to save the code to; overrides prefix - prefix: string - prefix to use for output file - format: string - one of "python" - include_config: boolean - whether to include node and workflow config values - - """ - formats = ["python"] - if format not in formats: - raise ValueError('format must be one of: %s' % '|'.join(formats)) - flatgraph = self._create_flat_graph() - nodes = nx.topological_sort(flatgraph) - - lines = ['# Workflow'] - importlines = ['from nipype.pipeline.engine import Workflow, ' - 'Node, MapNode'] - functions = {} - if format == "python": - connect_template = '%s.connect(%%s, %%s, %%s, "%%s")' % self.name - connect_template2 = '%s.connect(%%s, "%%s", %%s, "%%s")' \ - % self.name - wfdef = '%s = Workflow("%s")' % (self.name, self.name) - lines.append(wfdef) - if include_config: - lines.append('%s.config = %s' % (self.name, self.config)) - for idx, node in enumerate(nodes): - nodename = node.fullname.replace('.', '_') - # write nodes - nodelines = format_node(node, format='python', - include_config=include_config) - for line in nodelines: - if line.startswith('from'): - if line not in importlines: - importlines.append(line) - else: - lines.append(line) - # write connections - for u, _, d in flatgraph.in_edges_iter(nbunch=node, - data=True): - for cd in d['connect']: - if isinstance(cd[0], tuple): - args = list(cd[0]) - if args[1] in functions: - funcname = functions[args[1]] - else: - func = create_function_from_source(args[1]) - funcname = [name for name in func.func_globals - if name != '__builtins__'][0] - functions[args[1]] = funcname - args[1] = funcname - args = tuple([arg for arg in args if arg]) - line_args = (u.fullname.replace('.', '_'), - args, nodename, cd[1]) - line = connect_template % line_args - line = line.replace("'%s'" % funcname, funcname) - lines.append(line) - else: - line_args = (u.fullname.replace('.', '_'), - cd[0], nodename, cd[1]) - lines.append(connect_template2 % line_args) - functionlines = ['# Functions'] - for function in functions: - functionlines.append(pickle.loads(function).rstrip()) - all_lines = importlines + functionlines + lines - - if not filename: - filename = '%s%s.py' % (prefix, self.name) - with open(filename, 'wt') as fp: - fp.writelines('\n'.join(all_lines)) - return all_lines - - def run(self, plugin=None, plugin_args=None, updatehash=False): - """ Execute the workflow - - Parameters - ---------- - - plugin: plugin name or object - Plugin to use for execution. You can create your own plugins for - execution. - plugin_args : dictionary containing arguments to be sent to plugin - constructor. see individual plugin doc strings for details. - """ - if plugin is None: - plugin = config.get('execution', 'plugin') - if type(plugin) is not str: - runner = plugin - else: - name = 'nipype.pipeline.plugins' - try: - __import__(name) - except ImportError: - msg = 'Could not import plugin module: %s' % name - logger.error(msg) - raise ImportError(msg) - else: - plugin_mod = getattr(sys.modules[name], '%sPlugin' % plugin) - runner = plugin_mod(plugin_args=plugin_args) - flatgraph = self._create_flat_graph() - self.config = merge_dict(deepcopy(config._sections), self.config) - if 'crashdump_dir' in self.config: - warn(("Deprecated: workflow.config['crashdump_dir']\n" - "Please use config['execution']['crashdump_dir']")) - crash_dir = self.config['crashdump_dir'] - self.config['execution']['crashdump_dir'] = crash_dir - del self.config['crashdump_dir'] - logger.info(str(sorted(self.config))) - self._set_needed_outputs(flatgraph) - execgraph = generate_expanded_graph(deepcopy(flatgraph)) - for index, node in enumerate(execgraph.nodes()): - node.config = merge_dict(deepcopy(self.config), node.config) - node.base_dir = self.base_dir - node.index = index - if isinstance(node, MapNode): - node.use_plugin = (plugin, plugin_args) - self._configure_exec_nodes(execgraph) - if str2bool(self.config['execution']['create_report']): - self._write_report_info(self.base_dir, self.name, execgraph) - runner.run(execgraph, updatehash=updatehash, config=self.config) - datestr = datetime.utcnow().strftime('%Y%m%dT%H%M%S') - if str2bool(self.config['execution']['write_provenance']): - prov_base = op.join(self.base_dir, - 'workflow_provenance_%s' % datestr) - logger.info('Provenance file prefix: %s' % prov_base) - write_workflow_prov(execgraph, prov_base, format='all') - return execgraph - - # PRIVATE API AND FUNCTIONS - - def _write_report_info(self, workingdir, name, graph): - if workingdir is None: - workingdir = os.getcwd() - report_dir = op.join(workingdir, name) - if not op.exists(report_dir): - os.makedirs(report_dir) - shutil.copyfile(op.join(op.dirname(__file__), - 'report_template.html'), - op.join(report_dir, 'index.html')) - shutil.copyfile(op.join(op.dirname(__file__), - '..', 'external', 'd3.js'), - op.join(report_dir, 'd3.js')) - nodes, groups = topological_sort(graph, depth_first=True) - graph_file = op.join(report_dir, 'graph1.json') - json_dict = {'nodes': [], 'links': [], 'groups': [], 'maxN': 0} - for i, node in enumerate(nodes): - report_file = "%s/_report/report.rst" % \ - node.output_dir().replace(report_dir, '') - result_file = "%s/result_%s.pklz" % \ - (node.output_dir().replace(report_dir, ''), - node.name) - json_dict['nodes'].append(dict(name='%d_%s' % (i, node.name), - report=report_file, - result=result_file, - group=groups[i])) - maxN = 0 - for gid in np.unique(groups): - procs = [i for i, val in enumerate(groups) if val == gid] - N = len(procs) - if N > maxN: - maxN = N - json_dict['groups'].append(dict(procs=procs, - total=N, - name='Group_%05d' % gid)) - json_dict['maxN'] = maxN - for u, v in graph.in_edges_iter(): - json_dict['links'].append(dict(source=nodes.index(u), - target=nodes.index(v), - value=1)) - save_json(graph_file, json_dict) - graph_file = op.join(report_dir, 'graph.json') - # Avoid RuntimeWarning: divide by zero encountered in log10 - num_nodes = len(nodes) - if num_nodes > 0: - index_name = np.ceil(np.log10(num_nodes)).astype(int) - else: - index_name = 0 - template = '%%0%dd_' % index_name - - def getname(u, i): - name_parts = u.fullname.split('.') - #return '.'.join(name_parts[:-1] + [template % i + name_parts[-1]]) - return template % i + name_parts[-1] - json_dict = [] - for i, node in enumerate(nodes): - imports = [] - for u, v in graph.in_edges_iter(nbunch=node): - imports.append(getname(u, nodes.index(u))) - json_dict.append(dict(name=getname(node, i), - size=1, - group=groups[i], - imports=imports)) - save_json(graph_file, json_dict) - - def _set_needed_outputs(self, graph): - """Initialize node with list of which outputs are needed.""" - rm_outputs = self.config['execution']['remove_unnecessary_outputs'] - if not str2bool(rm_outputs): - return - for node in graph.nodes(): - node.needed_outputs = [] - for edge in graph.out_edges_iter(node): - data = graph.get_edge_data(*edge) - for sourceinfo, _ in sorted(data['connect']): - if isinstance(sourceinfo, tuple): - input_name = sourceinfo[0] - else: - input_name = sourceinfo - if input_name not in node.needed_outputs: - node.needed_outputs += [input_name] - if node.needed_outputs: - node.needed_outputs = sorted(node.needed_outputs) - - def _configure_exec_nodes(self, graph): - """Ensure that each node knows where to get inputs from - """ - for node in graph.nodes(): - node.input_source = {} - for edge in graph.in_edges_iter(node): - data = graph.get_edge_data(*edge) - for sourceinfo, field in sorted(data['connect']): - node.input_source[field] = \ - (op.join(edge[0].output_dir(), - 'result_%s.pklz' % edge[0].name), - sourceinfo) - - def _check_nodes(self, nodes): - """Checks if any of the nodes are already in the graph - - """ - node_names = [node.name for node in self._graph.nodes()] - node_lineage = [node._hierarchy for node in self._graph.nodes()] - for node in nodes: - if node.name in node_names: - idx = node_names.index(node.name) - if node_lineage[idx] in [node._hierarchy, self.name]: - raise IOError('Duplicate node name %s found.' % node.name) - else: - node_names.append(node.name) - - def _has_attr(self, parameter, subtype='in'): - """Checks if a parameter is available as an input or output - """ - if subtype == 'in': - subobject = self.inputs - else: - subobject = self.outputs - attrlist = parameter.split('.') - cur_out = subobject - for attr in attrlist: - if not hasattr(cur_out, attr): - return False - cur_out = getattr(cur_out, attr) - return True - - def _get_parameter_node(self, parameter, subtype='in'): - """Returns the underlying node corresponding to an input or - output parameter - """ - if subtype == 'in': - subobject = self.inputs - else: - subobject = self.outputs - attrlist = parameter.split('.') - cur_out = subobject - for attr in attrlist[:-1]: - cur_out = getattr(cur_out, attr) - return cur_out.traits()[attrlist[-1]].node - - def _check_outputs(self, parameter): - return self._has_attr(parameter, subtype='out') - - def _check_inputs(self, parameter): - return self._has_attr(parameter, subtype='in') - - def _get_inputs(self): - """Returns the inputs of a workflow - - This function does not return any input ports that are already - connected - """ - inputdict = TraitedSpec() - for node in self._graph.nodes(): - inputdict.add_trait(node.name, traits.Instance(TraitedSpec)) - if isinstance(node, Workflow): - setattr(inputdict, node.name, node.inputs) - else: - taken_inputs = [] - for _, _, d in self._graph.in_edges_iter(nbunch=node, - data=True): - for cd in d['connect']: - taken_inputs.append(cd[1]) - unconnectedinputs = TraitedSpec() - for key, trait in node.inputs.items(): - if key not in taken_inputs: - unconnectedinputs.add_trait(key, - traits.Trait(trait, - node=node)) - value = getattr(node.inputs, key) - setattr(unconnectedinputs, key, value) - setattr(inputdict, node.name, unconnectedinputs) - getattr(inputdict, node.name).on_trait_change(self._set_input) - return inputdict - - def _get_outputs(self): - """Returns all possible output ports that are not already connected - """ - outputdict = TraitedSpec() - for node in self._graph.nodes(): - outputdict.add_trait(node.name, traits.Instance(TraitedSpec)) - if isinstance(node, Workflow): - setattr(outputdict, node.name, node.outputs) - elif node.outputs: - outputs = TraitedSpec() - for key, _ in node.outputs.items(): - outputs.add_trait(key, traits.Any(node=node)) - setattr(outputs, key, None) - setattr(outputdict, node.name, outputs) - return outputdict - - def _set_input(self, object, name, newvalue): - """Trait callback function to update a node input - """ - object.traits()[name].node.set_input(name, newvalue) - - def _set_node_input(self, node, param, source, sourceinfo): - """Set inputs of a node given the edge connection""" - if isinstance(sourceinfo, string_types): - val = source.get_output(sourceinfo) - elif isinstance(sourceinfo, tuple): - if callable(sourceinfo[1]): - val = sourceinfo[1](source.get_output(sourceinfo[0]), - *sourceinfo[2:]) - newval = val - if isinstance(val, TraitDictObject): - newval = dict(val) - if isinstance(val, TraitListObject): - newval = val[:] - logger.debug('setting node input: %s->%s', param, str(newval)) - node.set_input(param, deepcopy(newval)) - - def _get_all_nodes(self): - allnodes = [] - for node in self._graph.nodes(): - if isinstance(node, Workflow): - allnodes.extend(node._get_all_nodes()) - else: - allnodes.append(node) - return allnodes - - def _has_node(self, wanted_node): - for node in self._graph.nodes(): - if wanted_node == node: - return True - if isinstance(node, Workflow): - if node._has_node(wanted_node): - return True - return False - - def _create_flat_graph(self): - """Make a simple DAG where no node is a workflow.""" - logger.debug('Creating flat graph for workflow: %s', self.name) - workflowcopy = deepcopy(self) - workflowcopy._generate_flatgraph() - return workflowcopy._graph - - def _reset_hierarchy(self): - """Reset the hierarchy on a graph - """ - for node in self._graph.nodes(): - if isinstance(node, Workflow): - node._reset_hierarchy() - for innernode in node._graph.nodes(): - innernode._hierarchy = '.'.join((self.name, - innernode._hierarchy)) - else: - node._hierarchy = self.name - - def _generate_flatgraph(self): - """Generate a graph containing only Nodes or MapNodes - """ - logger.debug('expanding workflow: %s', self) - nodes2remove = [] - if not nx.is_directed_acyclic_graph(self._graph): - raise Exception(('Workflow: %s is not a directed acyclic graph ' - '(DAG)') % self.name) - nodes = nx.topological_sort(self._graph) - for node in nodes: - logger.debug('processing node: %s' % node) - if isinstance(node, Workflow): - nodes2remove.append(node) - # use in_edges instead of in_edges_iter to allow - # disconnections to take place properly. otherwise, the - # edge dict is modified. - for u, _, d in self._graph.in_edges(nbunch=node, data=True): - logger.debug('in: connections-> %s' % str(d['connect'])) - for cd in deepcopy(d['connect']): - logger.debug("in: %s" % str(cd)) - dstnode = node._get_parameter_node(cd[1], subtype='in') - srcnode = u - srcout = cd[0] - dstin = cd[1].split('.')[-1] - logger.debug('in edges: %s %s %s %s' % - (srcnode, srcout, dstnode, dstin)) - self.disconnect(u, cd[0], node, cd[1]) - self.connect(srcnode, srcout, dstnode, dstin) - # do not use out_edges_iter for reasons stated in in_edges - for _, v, d in self._graph.out_edges(nbunch=node, data=True): - logger.debug('out: connections-> %s' % str(d['connect'])) - for cd in deepcopy(d['connect']): - logger.debug("out: %s" % str(cd)) - dstnode = v - if isinstance(cd[0], tuple): - parameter = cd[0][0] - else: - parameter = cd[0] - srcnode = node._get_parameter_node(parameter, - subtype='out') - if isinstance(cd[0], tuple): - srcout = list(cd[0]) - srcout[0] = parameter.split('.')[-1] - srcout = tuple(srcout) - else: - srcout = parameter.split('.')[-1] - dstin = cd[1] - logger.debug('out edges: %s %s %s %s' % (srcnode, - srcout, - dstnode, - dstin)) - self.disconnect(node, cd[0], v, cd[1]) - self.connect(srcnode, srcout, dstnode, dstin) - # expand the workflow node - #logger.debug('expanding workflow: %s', node) - node._generate_flatgraph() - for innernode in node._graph.nodes(): - innernode._hierarchy = '.'.join((self.name, - innernode._hierarchy)) - self._graph.add_nodes_from(node._graph.nodes()) - self._graph.add_edges_from(node._graph.edges(data=True)) - if nodes2remove: - self._graph.remove_nodes_from(nodes2remove) - logger.debug('finished expanding workflow: %s', self) - - def _get_dot(self, prefix=None, hierarchy=None, colored=False, - simple_form=True, level=0): - """Create a dot file with connection info - """ - if prefix is None: - prefix = ' ' - if hierarchy is None: - hierarchy = [] - colorset = ['#FFFFC8','#0000FF','#B4B4FF','#E6E6FF','#FF0000', - '#FFB4B4','#FFE6E6','#00A300','#B4FFB4','#E6FFE6'] - - dotlist = ['%slabel="%s";' % (prefix, self.name)] - for node in nx.topological_sort(self._graph): - fullname = '.'.join(hierarchy + [node.fullname]) - nodename = fullname.replace('.', '_') - if not isinstance(node, Workflow): - node_class_name = get_print_name(node, simple_form=simple_form) - if not simple_form: - node_class_name = '.'.join(node_class_name.split('.')[1:]) - if hasattr(node, 'iterables') and node.iterables: - dotlist.append(('%s[label="%s", shape=box3d,' - 'style=filled, color=black, colorscheme' - '=greys7 fillcolor=2];') % (nodename, - node_class_name)) - else: - if colored: - dotlist.append(('%s[label="%s", style=filled,' - ' fillcolor="%s"];') - % (nodename,node_class_name, - colorset[level])) - else: - dotlist.append(('%s[label="%s"];') - % (nodename,node_class_name)) - - for node in nx.topological_sort(self._graph): - if isinstance(node, Workflow): - fullname = '.'.join(hierarchy + [node.fullname]) - nodename = fullname.replace('.', '_') - dotlist.append('subgraph cluster_%s {' % nodename) - if colored: - dotlist.append(prefix + prefix + 'edge [color="%s"];' % (colorset[level+1])) - dotlist.append(prefix + prefix + 'style=filled;') - dotlist.append(prefix + prefix + 'fillcolor="%s";' % (colorset[level+2])) - dotlist.append(node._get_dot(prefix=prefix + prefix, - hierarchy=hierarchy + [self.name], - colored=colored, - simple_form=simple_form, level=level+3)) - dotlist.append('}') - if level==6:level=2 - else: - for subnode in self._graph.successors_iter(node): - if node._hierarchy != subnode._hierarchy: - continue - if not isinstance(subnode, Workflow): - nodefullname = '.'.join(hierarchy + [node.fullname]) - subnodefullname = '.'.join(hierarchy + - [subnode.fullname]) - nodename = nodefullname.replace('.', '_') - subnodename = subnodefullname.replace('.', '_') - for _ in self._graph.get_edge_data(node, - subnode)['connect']: - dotlist.append('%s -> %s;' % (nodename, - subnodename)) - logger.debug('connection: ' + dotlist[-1]) - # add between workflow connections - for u, v, d in self._graph.edges_iter(data=True): - uname = '.'.join(hierarchy + [u.fullname]) - vname = '.'.join(hierarchy + [v.fullname]) - for src, dest in d['connect']: - uname1 = uname - vname1 = vname - if isinstance(src, tuple): - srcname = src[0] - else: - srcname = src - if '.' in srcname: - uname1 += '.' + '.'.join(srcname.split('.')[:-1]) - if '.' in dest and '@' not in dest: - if not isinstance(v, Workflow): - if 'datasink' not in \ - str(v._interface.__class__).lower(): - vname1 += '.' + '.'.join(dest.split('.')[:-1]) - else: - vname1 += '.' + '.'.join(dest.split('.')[:-1]) - if uname1.split('.')[:-1] != vname1.split('.')[:-1]: - dotlist.append('%s -> %s;' % (uname1.replace('.', '_'), - vname1.replace('.', '_'))) - logger.debug('cross connection: ' + dotlist[-1]) - return ('\n' + prefix).join(dotlist) - - class Node(EngineBase): """Wraps interface objects for use in pipeline From 360584d7d74f30b436eee23872232dd650eacd59 Mon Sep 17 00:00:00 2001 From: Chris Gorgolewski Date: Sun, 31 Jan 2016 18:39:45 -0800 Subject: [PATCH 14/22] enhance circle CI error reporting --- nipype/pipeline/engine/nodes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index c128fadd89..38c2b33bc3 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -81,7 +81,7 @@ def _write_inputs(node): if type(val) == str: try: func = create_function_from_source(val) - except RuntimeError, e: + except RuntimeError: lines.append("%s.inputs.%s = '%s'" % (nodename, key, val)) else: funcname = [name for name in func.func_globals From 69d6c781b7a662baaceb15146a1e1a9ea3bc9e6c Mon Sep 17 00:00:00 2001 From: Chris Gorgolewski Date: Sun, 31 Jan 2016 18:50:52 -0800 Subject: [PATCH 15/22] further fixes --- nipype/pipeline/engine/nodes.py | 980 ---------------------------- nipype/pipeline/engine/workflows.py | 8 +- 2 files changed, 7 insertions(+), 981 deletions(-) diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index e2b202d02f..9f9165e3b2 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -72,986 +72,6 @@ from .base import EngineBase -class WorkflowBase(object): - """Defines common attributes and functions for workflows and nodes.""" - - def __init__(self, name=None, base_dir=None): - """ Initialize base parameters of a workflow or node - - Parameters - ---------- - name : string (mandatory) - Name of this node. Name must be alphanumeric and not contain any - special characters (e.g., '.', '@'). - base_dir : string - base output directory (will be hashed before creations) - default=None, which results in the use of mkdtemp - - """ - self.base_dir = base_dir - self.config = None - self._verify_name(name) - self.name = name - # for compatibility with node expansion using iterables - self._id = self.name - self._hierarchy = None - - @property - def inputs(self): - raise NotImplementedError - - @property - def outputs(self): - raise NotImplementedError - - @property - def fullname(self): - fullname = self.name - if self._hierarchy: - fullname = self._hierarchy + '.' + self.name - return fullname - - def clone(self, name): - """Clone a workflowbase object - - Parameters - ---------- - - name : string (mandatory) - A clone of node or workflow must have a new name - """ - if (name is None) or (name == self.name): - raise Exception('Cloning requires a new name') - self._verify_name(name) - clone = deepcopy(self) - clone.name = name - clone._id = name - clone._hierarchy = None - return clone - - def _check_outputs(self, parameter): - return hasattr(self.outputs, parameter) - - def _check_inputs(self, parameter): - if isinstance(self.inputs, DynamicTraitedSpec): - return True - return hasattr(self.inputs, parameter) - - def _verify_name(self, name): - valid_name = bool(re.match('^[\w-]+$', name)) - if not valid_name: - raise Exception('the name must not contain any special characters') - - def __repr__(self): - if self._hierarchy: - return '.'.join((self._hierarchy, self._id)) - else: - return self._id - - def save(self, filename=None): - if filename is None: - filename = 'temp.pklz' - savepkl(filename, self) - - def load(self, filename): - if '.npz' in filename: - DeprecationWarning(('npz files will be deprecated in the next ' - 'release. you can use numpy to open them.')) - return np.load(filename) - return loadpkl(filename) - - -class Workflow(WorkflowBase): - """Controls the setup and execution of a pipeline of processes.""" - - def __init__(self, name, base_dir=None): - """Create a workflow object. - - Parameters - ---------- - name : alphanumeric string - unique identifier for the workflow - base_dir : string, optional - path to workflow storage - - """ - super(Workflow, self).__init__(name, base_dir) - self._graph = nx.DiGraph() - self.config = deepcopy(config._sections) - - # PUBLIC API - def clone(self, name): - """Clone a workflow - - .. note:: - - Will reset attributes used for executing workflow. See - _init_runtime_fields. - - Parameters - ---------- - - name: alphanumeric name - unique name for the workflow - - """ - clone = super(Workflow, self).clone(name) - clone._reset_hierarchy() - return clone - - # Graph creation functions - def connect(self, *args, **kwargs): - """Connect nodes in the pipeline. - - This routine also checks if inputs and outputs are actually provided by - the nodes that are being connected. - - Creates edges in the directed graph using the nodes and edges specified - in the `connection_list`. Uses the NetworkX method - DiGraph.add_edges_from. - - Parameters - ---------- - - args : list or a set of four positional arguments - - Four positional arguments of the form:: - - connect(source, sourceoutput, dest, destinput) - - source : nodewrapper node - sourceoutput : string (must be in source.outputs) - dest : nodewrapper node - destinput : string (must be in dest.inputs) - - A list of 3-tuples of the following form:: - - [(source, target, - [('sourceoutput/attribute', 'targetinput'), - ...]), - ...] - - Or:: - - [(source, target, [(('sourceoutput1', func, arg2, ...), - 'targetinput'), ...]), - ...] - sourceoutput1 will always be the first argument to func - and func will be evaluated and the results sent ot targetinput - - currently func needs to define all its needed imports within the - function as we use the inspect module to get at the source code - and execute it remotely - """ - if len(args) == 1: - connection_list = args[0] - elif len(args) == 4: - connection_list = [(args[0], args[2], [(args[1], args[3])])] - else: - raise Exception('unknown set of parameters to connect function') - if not kwargs: - disconnect = False - else: - disconnect = kwargs['disconnect'] - newnodes = [] - for srcnode, destnode, _ in connection_list: - if self in [srcnode, destnode]: - msg = ('Workflow connect cannot contain itself as node:' - ' src[%s] dest[%s] workflow[%s]') % (srcnode, - destnode, - self.name) - - raise IOError(msg) - if (srcnode not in newnodes) and not self._has_node(srcnode): - newnodes.append(srcnode) - if (destnode not in newnodes) and not self._has_node(destnode): - newnodes.append(destnode) - if newnodes: - self._check_nodes(newnodes) - for node in newnodes: - if node._hierarchy is None: - node._hierarchy = self.name - not_found = [] - connected_ports = {} - for srcnode, destnode, connects in connection_list: - if destnode not in connected_ports: - connected_ports[destnode] = [] - # check to see which ports of destnode are already - # connected. - if not disconnect and (destnode in self._graph.nodes()): - for edge in self._graph.in_edges_iter(destnode): - data = self._graph.get_edge_data(*edge) - for sourceinfo, destname in data['connect']: - if destname not in connected_ports[destnode]: - connected_ports[destnode] += [destname] - for source, dest in connects: - # Currently datasource/sink/grabber.io modules - # determine their inputs/outputs depending on - # connection settings. Skip these modules in the check - if dest in connected_ports[destnode]: - raise Exception(""" -Trying to connect %s:%s to %s:%s but input '%s' of node '%s' is already -connected. -""" % (srcnode, source, destnode, dest, dest, destnode)) - if not (hasattr(destnode, '_interface') and - '.io' in str(destnode._interface.__class__)): - if not destnode._check_inputs(dest): - not_found.append(['in', destnode.name, dest]) - if not (hasattr(srcnode, '_interface') and - '.io' in str(srcnode._interface.__class__)): - if isinstance(source, tuple): - # handles the case that source is specified - # with a function - sourcename = source[0] - elif isinstance(source, six.string_types): - sourcename = source - else: - raise Exception(('Unknown source specification in ' - 'connection from output of %s') % - srcnode.name) - if sourcename and not srcnode._check_outputs(sourcename): - not_found.append(['out', srcnode.name, sourcename]) - connected_ports[destnode] += [dest] - infostr = [] - for info in not_found: - infostr += ["Module %s has no %sput called %s\n" % (info[1], - info[0], - info[2])] - if not_found: - raise Exception('\n'.join(['Some connections were not found'] + - infostr)) - - # turn functions into strings - for srcnode, destnode, connects in connection_list: - for idx, (src, dest) in enumerate(connects): - if isinstance(src, tuple) and not isinstance(src[1], six.string_types): - function_source = getsource(src[1]) - connects[idx] = ((src[0], function_source, src[2:]), dest) - - # add connections - for srcnode, destnode, connects in connection_list: - edge_data = self._graph.get_edge_data(srcnode, destnode, None) - if edge_data: - logger.debug('(%s, %s): Edge data exists: %s' - % (srcnode, destnode, str(edge_data))) - for data in connects: - if data not in edge_data['connect']: - edge_data['connect'].append(data) - if disconnect: - logger.debug('Removing connection: %s' % str(data)) - edge_data['connect'].remove(data) - if edge_data['connect']: - self._graph.add_edges_from([(srcnode, - destnode, - edge_data)]) - else: - #pass - logger.debug('Removing connection: %s->%s' % (srcnode, - destnode)) - self._graph.remove_edges_from([(srcnode, destnode)]) - elif not disconnect: - logger.debug('(%s, %s): No edge data' % (srcnode, destnode)) - self._graph.add_edges_from([(srcnode, destnode, - {'connect': connects})]) - edge_data = self._graph.get_edge_data(srcnode, destnode) - logger.debug('(%s, %s): new edge data: %s' % (srcnode, destnode, - str(edge_data))) - - def disconnect(self, *args): - """Disconnect two nodes - - See the docstring for connect for format. - """ - # yoh: explicit **dict was introduced for compatibility with Python 2.5 - return self.connect(*args, **dict(disconnect=True)) - - def add_nodes(self, nodes): - """ Add nodes to a workflow - - Parameters - ---------- - nodes : list - A list of WorkflowBase-based objects - """ - newnodes = [] - all_nodes = self._get_all_nodes() - for node in nodes: - if self._has_node(node): - raise IOError('Node %s already exists in the workflow' % node) - if isinstance(node, Workflow): - for subnode in node._get_all_nodes(): - if subnode in all_nodes: - raise IOError(('Subnode %s of node %s already exists ' - 'in the workflow') % (subnode, node)) - newnodes.append(node) - if not newnodes: - logger.debug('no new nodes to add') - return - for node in newnodes: - if not issubclass(node.__class__, WorkflowBase): - raise Exception('Node %s must be a subclass of WorkflowBase' % - str(node)) - self._check_nodes(newnodes) - for node in newnodes: - if node._hierarchy is None: - node._hierarchy = self.name - self._graph.add_nodes_from(newnodes) - - def remove_nodes(self, nodes): - """ Remove nodes from a workflow - - Parameters - ---------- - nodes : list - A list of WorkflowBase-based objects - """ - self._graph.remove_nodes_from(nodes) - - # Input-Output access - @property - def inputs(self): - return self._get_inputs() - - @property - def outputs(self): - return self._get_outputs() - - def get_node(self, name): - """Return an internal node by name - """ - nodenames = name.split('.') - nodename = nodenames[0] - outnode = [node for node in self._graph.nodes() if - str(node).endswith('.' + nodename)] - if outnode: - outnode = outnode[0] - if nodenames[1:] and issubclass(outnode.__class__, Workflow): - outnode = outnode.get_node('.'.join(nodenames[1:])) - else: - outnode = None - return outnode - - def list_node_names(self): - """List names of all nodes in a workflow - """ - outlist = [] - for node in nx.topological_sort(self._graph): - if isinstance(node, Workflow): - outlist.extend(['.'.join((node.name, nodename)) for nodename in - node.list_node_names()]) - else: - outlist.append(node.name) - return sorted(outlist) - - def write_graph(self, dotfilename='graph.dot', graph2use='hierarchical', - format="png", simple_form=True): - """Generates a graphviz dot file and a png file - - Parameters - ---------- - - graph2use: 'orig', 'hierarchical' (default), 'flat', 'exec', 'colored' - orig - creates a top level graph without expanding internal - workflow nodes; - flat - expands workflow nodes recursively; - hierarchical - expands workflow nodes recursively with a - notion on hierarchy; - colored - expands workflow nodes recursively with a - notion on hierarchy in color; - exec - expands workflows to depict iterables - - format: 'png', 'svg' - - simple_form: boolean (default: True) - Determines if the node name used in the graph should be of the form - 'nodename (package)' when True or 'nodename.Class.package' when - False. - - """ - graphtypes = ['orig', 'flat', 'hierarchical', 'exec', 'colored'] - if graph2use not in graphtypes: - raise ValueError('Unknown graph2use keyword. Must be one of: ' + - str(graphtypes)) - base_dir, dotfilename = op.split(dotfilename) - if base_dir == '': - if self.base_dir: - base_dir = self.base_dir - if self.name: - base_dir = op.join(base_dir, self.name) - else: - base_dir = os.getcwd() - base_dir = make_output_dir(base_dir) - if graph2use in ['hierarchical', 'colored']: - dotfilename = op.join(base_dir, dotfilename) - self.write_hierarchical_dotfile(dotfilename=dotfilename, - colored=graph2use == "colored", - simple_form=simple_form) - format_dot(dotfilename, format=format) - else: - graph = self._graph - if graph2use in ['flat', 'exec']: - graph = self._create_flat_graph() - if graph2use == 'exec': - graph = generate_expanded_graph(deepcopy(graph)) - export_graph(graph, base_dir, dotfilename=dotfilename, - format=format, simple_form=simple_form) - - def write_hierarchical_dotfile(self, dotfilename=None, colored=False, - simple_form=True): - dotlist = ['digraph %s{' % self.name] - dotlist.append(self._get_dot(prefix=' ', colored=colored, - simple_form=simple_form)) - dotlist.append('}') - dotstr = '\n'.join(dotlist) - if dotfilename: - fp = open(dotfilename, 'wt') - fp.writelines(dotstr) - fp.close() - else: - logger.info(dotstr) - - def export(self, filename=None, prefix="output", format="python", - include_config=False): - """Export object into a different format - - Parameters - ---------- - filename: string - file to save the code to; overrides prefix - prefix: string - prefix to use for output file - format: string - one of "python" - include_config: boolean - whether to include node and workflow config values - - """ - formats = ["python"] - if format not in formats: - raise ValueError('format must be one of: %s' % '|'.join(formats)) - flatgraph = self._create_flat_graph() - nodes = nx.topological_sort(flatgraph) - - lines = ['# Workflow'] - importlines = ['from nipype.pipeline.engine import Workflow, ' - 'Node, MapNode'] - functions = {} - if format == "python": - connect_template = '%s.connect(%%s, %%s, %%s, "%%s")' % self.name - connect_template2 = '%s.connect(%%s, "%%s", %%s, "%%s")' \ - % self.name - wfdef = '%s = Workflow("%s")' % (self.name, self.name) - lines.append(wfdef) - if include_config: - lines.append('%s.config = %s' % (self.name, self.config)) - for idx, node in enumerate(nodes): - nodename = node.fullname.replace('.', '_') - # write nodes - nodelines = format_node(node, format='python', - include_config=include_config) - for line in nodelines: - if line.startswith('from'): - if line not in importlines: - importlines.append(line) - else: - lines.append(line) - # write connections - for u, _, d in flatgraph.in_edges_iter(nbunch=node, - data=True): - for cd in d['connect']: - if isinstance(cd[0], tuple): - args = list(cd[0]) - if args[1] in functions: - funcname = functions[args[1]] - else: - func = create_function_from_source(args[1]) - funcname = [name for name in func.func_globals - if name != '__builtins__'][0] - functions[args[1]] = funcname - args[1] = funcname - args = tuple([arg for arg in args if arg]) - line_args = (u.fullname.replace('.', '_'), - args, nodename, cd[1]) - line = connect_template % line_args - line = line.replace("'%s'" % funcname, funcname) - lines.append(line) - else: - line_args = (u.fullname.replace('.', '_'), - cd[0], nodename, cd[1]) - lines.append(connect_template2 % line_args) - functionlines = ['# Functions'] - for function in functions: - functionlines.append(cPickle.loads(function).rstrip()) - all_lines = importlines + functionlines + lines - - if not filename: - filename = '%s%s.py' % (prefix, self.name) - with open(filename, 'wt') as fp: - fp.writelines('\n'.join(all_lines)) - return all_lines - - def run(self, plugin=None, plugin_args=None, updatehash=False): - """ Execute the workflow - - Parameters - ---------- - - plugin: plugin name or object - Plugin to use for execution. You can create your own plugins for - execution. - plugin_args : dictionary containing arguments to be sent to plugin - constructor. see individual plugin doc strings for details. - """ - if plugin is None: - plugin = config.get('execution', 'plugin') - if type(plugin) is not str: - runner = plugin - else: - name = 'nipype.pipeline.plugins' - try: - __import__(name) - except ImportError: - msg = 'Could not import plugin module: %s' % name - logger.error(msg) - raise ImportError(msg) - else: - plugin_mod = getattr(sys.modules[name], '%sPlugin' % plugin) - runner = plugin_mod(plugin_args=plugin_args) - flatgraph = self._create_flat_graph() - self.config = merge_dict(deepcopy(config._sections), self.config) - if 'crashdump_dir' in self.config: - warn(("Deprecated: workflow.config['crashdump_dir']\n" - "Please use config['execution']['crashdump_dir']")) - crash_dir = self.config['crashdump_dir'] - self.config['execution']['crashdump_dir'] = crash_dir - del self.config['crashdump_dir'] - logger.info(str(sorted(self.config))) - self._set_needed_outputs(flatgraph) - execgraph = generate_expanded_graph(deepcopy(flatgraph)) - for index, node in enumerate(execgraph.nodes()): - node.config = merge_dict(deepcopy(self.config), node.config) - node.base_dir = self.base_dir - node.index = index - if isinstance(node, MapNode): - node.use_plugin = (plugin, plugin_args) - self._configure_exec_nodes(execgraph) - if str2bool(self.config['execution']['create_report']): - self._write_report_info(self.base_dir, self.name, execgraph) - runner.run(execgraph, updatehash=updatehash, config=self.config) - datestr = datetime.utcnow().strftime('%Y%m%dT%H%M%S') - if str2bool(self.config['execution']['write_provenance']): - prov_base = op.join(self.base_dir, - 'workflow_provenance_%s' % datestr) - logger.info('Provenance file prefix: %s' % prov_base) - write_workflow_prov(execgraph, prov_base, format='all') - return execgraph - - # PRIVATE API AND FUNCTIONS - - def _write_report_info(self, workingdir, name, graph): - if workingdir is None: - workingdir = os.getcwd() - report_dir = op.join(workingdir, name) - if not op.exists(report_dir): - os.makedirs(report_dir) - shutil.copyfile(op.join(op.dirname(__file__), - 'report_template.html'), - op.join(report_dir, 'index.html')) - shutil.copyfile(op.join(op.dirname(__file__), - '..', 'external', 'd3.js'), - op.join(report_dir, 'd3.js')) - nodes, groups = topological_sort(graph, depth_first=True) - graph_file = op.join(report_dir, 'graph1.json') - json_dict = {'nodes': [], 'links': [], 'groups': [], 'maxN': 0} - for i, node in enumerate(nodes): - report_file = "%s/_report/report.rst" % \ - node.output_dir().replace(report_dir, '') - result_file = "%s/result_%s.pklz" % \ - (node.output_dir().replace(report_dir, ''), - node.name) - json_dict['nodes'].append(dict(name='%d_%s' % (i, node.name), - report=report_file, - result=result_file, - group=groups[i])) - maxN = 0 - for gid in np.unique(groups): - procs = [i for i, val in enumerate(groups) if val == gid] - N = len(procs) - if N > maxN: - maxN = N - json_dict['groups'].append(dict(procs=procs, - total=N, - name='Group_%05d' % gid)) - json_dict['maxN'] = maxN - for u, v in graph.in_edges_iter(): - json_dict['links'].append(dict(source=nodes.index(u), - target=nodes.index(v), - value=1)) - save_json(graph_file, json_dict) - graph_file = op.join(report_dir, 'graph.json') - # Avoid RuntimeWarning: divide by zero encountered in log10 - num_nodes = len(nodes) - if num_nodes > 0: - index_name = np.ceil(np.log10(num_nodes)).astype(int) - else: - index_name = 0 - template = '%%0%dd_' % index_name - - def getname(u, i): - name_parts = u.fullname.split('.') - #return '.'.join(name_parts[:-1] + [template % i + name_parts[-1]]) - return template % i + name_parts[-1] - json_dict = [] - for i, node in enumerate(nodes): - imports = [] - for u, v in graph.in_edges_iter(nbunch=node): - imports.append(getname(u, nodes.index(u))) - json_dict.append(dict(name=getname(node, i), - size=1, - group=groups[i], - imports=imports)) - save_json(graph_file, json_dict) - - def _set_needed_outputs(self, graph): - """Initialize node with list of which outputs are needed.""" - rm_outputs = self.config['execution']['remove_unnecessary_outputs'] - if not str2bool(rm_outputs): - return - for node in graph.nodes(): - node.needed_outputs = [] - for edge in graph.out_edges_iter(node): - data = graph.get_edge_data(*edge) - for sourceinfo, _ in sorted(data['connect']): - if isinstance(sourceinfo, tuple): - input_name = sourceinfo[0] - else: - input_name = sourceinfo - if input_name not in node.needed_outputs: - node.needed_outputs += [input_name] - if node.needed_outputs: - node.needed_outputs = sorted(node.needed_outputs) - - def _configure_exec_nodes(self, graph): - """Ensure that each node knows where to get inputs from - """ - for node in graph.nodes(): - node.input_source = {} - for edge in graph.in_edges_iter(node): - data = graph.get_edge_data(*edge) - for sourceinfo, field in sorted(data['connect']): - node.input_source[field] = \ - (op.join(edge[0].output_dir(), - 'result_%s.pklz' % edge[0].name), - sourceinfo) - - def _check_nodes(self, nodes): - """Checks if any of the nodes are already in the graph - - """ - node_names = [node.name for node in self._graph.nodes()] - node_lineage = [node._hierarchy for node in self._graph.nodes()] - for node in nodes: - if node.name in node_names: - idx = node_names.index(node.name) - if node_lineage[idx] in [node._hierarchy, self.name]: - raise IOError('Duplicate node name %s found.' % node.name) - else: - node_names.append(node.name) - - def _has_attr(self, parameter, subtype='in'): - """Checks if a parameter is available as an input or output - """ - if subtype == 'in': - subobject = self.inputs - else: - subobject = self.outputs - attrlist = parameter.split('.') - cur_out = subobject - for attr in attrlist: - if not hasattr(cur_out, attr): - return False - cur_out = getattr(cur_out, attr) - return True - - def _get_parameter_node(self, parameter, subtype='in'): - """Returns the underlying node corresponding to an input or - output parameter - """ - if subtype == 'in': - subobject = self.inputs - else: - subobject = self.outputs - attrlist = parameter.split('.') - cur_out = subobject - for attr in attrlist[:-1]: - cur_out = getattr(cur_out, attr) - return cur_out.traits()[attrlist[-1]].node - - def _check_outputs(self, parameter): - return self._has_attr(parameter, subtype='out') - - def _check_inputs(self, parameter): - return self._has_attr(parameter, subtype='in') - - def _get_inputs(self): - """Returns the inputs of a workflow - - This function does not return any input ports that are already - connected - """ - inputdict = TraitedSpec() - for node in self._graph.nodes(): - inputdict.add_trait(node.name, traits.Instance(TraitedSpec)) - if isinstance(node, Workflow): - setattr(inputdict, node.name, node.inputs) - else: - taken_inputs = [] - for _, _, d in self._graph.in_edges_iter(nbunch=node, - data=True): - for cd in d['connect']: - taken_inputs.append(cd[1]) - unconnectedinputs = TraitedSpec() - for key, trait in node.inputs.items(): - if key not in taken_inputs: - unconnectedinputs.add_trait(key, - traits.Trait(trait, - node=node)) - value = getattr(node.inputs, key) - setattr(unconnectedinputs, key, value) - setattr(inputdict, node.name, unconnectedinputs) - getattr(inputdict, node.name).on_trait_change(self._set_input) - return inputdict - - def _get_outputs(self): - """Returns all possible output ports that are not already connected - """ - outputdict = TraitedSpec() - for node in self._graph.nodes(): - outputdict.add_trait(node.name, traits.Instance(TraitedSpec)) - if isinstance(node, Workflow): - setattr(outputdict, node.name, node.outputs) - elif node.outputs: - outputs = TraitedSpec() - for key, _ in node.outputs.items(): - outputs.add_trait(key, traits.Any(node=node)) - setattr(outputs, key, None) - setattr(outputdict, node.name, outputs) - return outputdict - - def _set_input(self, object, name, newvalue): - """Trait callback function to update a node input - """ - object.traits()[name].node.set_input(name, newvalue) - - def _set_node_input(self, node, param, source, sourceinfo): - """Set inputs of a node given the edge connection""" - if isinstance(sourceinfo, six.string_types): - val = source.get_output(sourceinfo) - elif isinstance(sourceinfo, tuple): - if callable(sourceinfo[1]): - val = sourceinfo[1](source.get_output(sourceinfo[0]), - *sourceinfo[2:]) - newval = val - if isinstance(val, TraitDictObject): - newval = dict(val) - if isinstance(val, TraitListObject): - newval = val[:] - logger.debug('setting node input: %s->%s', param, str(newval)) - node.set_input(param, deepcopy(newval)) - - def _get_all_nodes(self): - allnodes = [] - for node in self._graph.nodes(): - if isinstance(node, Workflow): - allnodes.extend(node._get_all_nodes()) - else: - allnodes.append(node) - return allnodes - - def _has_node(self, wanted_node): - for node in self._graph.nodes(): - if wanted_node == node: - return True - if isinstance(node, Workflow): - if node._has_node(wanted_node): - return True - return False - - def _create_flat_graph(self): - """Make a simple DAG where no node is a workflow.""" - logger.debug('Creating flat graph for workflow: %s', self.name) - workflowcopy = deepcopy(self) - workflowcopy._generate_flatgraph() - return workflowcopy._graph - - def _reset_hierarchy(self): - """Reset the hierarchy on a graph - """ - for node in self._graph.nodes(): - if isinstance(node, Workflow): - node._reset_hierarchy() - for innernode in node._graph.nodes(): - innernode._hierarchy = '.'.join((self.name, - innernode._hierarchy)) - else: - node._hierarchy = self.name - - def _generate_flatgraph(self): - """Generate a graph containing only Nodes or MapNodes - """ - logger.debug('expanding workflow: %s', self) - nodes2remove = [] - if not nx.is_directed_acyclic_graph(self._graph): - raise Exception(('Workflow: %s is not a directed acyclic graph ' - '(DAG)') % self.name) - nodes = nx.topological_sort(self._graph) - for node in nodes: - logger.debug('processing node: %s' % node) - if isinstance(node, Workflow): - nodes2remove.append(node) - # use in_edges instead of in_edges_iter to allow - # disconnections to take place properly. otherwise, the - # edge dict is modified. - for u, _, d in self._graph.in_edges(nbunch=node, data=True): - logger.debug('in: connections-> %s' % str(d['connect'])) - for cd in deepcopy(d['connect']): - logger.debug("in: %s" % str(cd)) - dstnode = node._get_parameter_node(cd[1], subtype='in') - srcnode = u - srcout = cd[0] - dstin = cd[1].split('.')[-1] - logger.debug('in edges: %s %s %s %s' % - (srcnode, srcout, dstnode, dstin)) - self.disconnect(u, cd[0], node, cd[1]) - self.connect(srcnode, srcout, dstnode, dstin) - # do not use out_edges_iter for reasons stated in in_edges - for _, v, d in self._graph.out_edges(nbunch=node, data=True): - logger.debug('out: connections-> %s' % str(d['connect'])) - for cd in deepcopy(d['connect']): - logger.debug("out: %s" % str(cd)) - dstnode = v - if isinstance(cd[0], tuple): - parameter = cd[0][0] - else: - parameter = cd[0] - srcnode = node._get_parameter_node(parameter, - subtype='out') - if isinstance(cd[0], tuple): - srcout = list(cd[0]) - srcout[0] = parameter.split('.')[-1] - srcout = tuple(srcout) - else: - srcout = parameter.split('.')[-1] - dstin = cd[1] - logger.debug('out edges: %s %s %s %s' % (srcnode, - srcout, - dstnode, - dstin)) - self.disconnect(node, cd[0], v, cd[1]) - self.connect(srcnode, srcout, dstnode, dstin) - # expand the workflow node - #logger.debug('expanding workflow: %s', node) - node._generate_flatgraph() - for innernode in node._graph.nodes(): - innernode._hierarchy = '.'.join((self.name, - innernode._hierarchy)) - self._graph.add_nodes_from(node._graph.nodes()) - self._graph.add_edges_from(node._graph.edges(data=True)) - if nodes2remove: - self._graph.remove_nodes_from(nodes2remove) - logger.debug('finished expanding workflow: %s', self) - - def _get_dot(self, prefix=None, hierarchy=None, colored=False, - simple_form=True, level=0): - """Create a dot file with connection info - """ - if prefix is None: - prefix = ' ' - if hierarchy is None: - hierarchy = [] - colorset = ['#FFFFC8','#0000FF','#B4B4FF','#E6E6FF','#FF0000', - '#FFB4B4','#FFE6E6','#00A300','#B4FFB4','#E6FFE6'] - - dotlist = ['%slabel="%s";' % (prefix, self.name)] - for node in nx.topological_sort(self._graph): - fullname = '.'.join(hierarchy + [node.fullname]) - nodename = fullname.replace('.', '_') - if not isinstance(node, Workflow): - node_class_name = get_print_name(node, simple_form=simple_form) - if not simple_form: - node_class_name = '.'.join(node_class_name.split('.')[1:]) - if hasattr(node, 'iterables') and node.iterables: - dotlist.append(('%s[label="%s", shape=box3d,' - 'style=filled, color=black, colorscheme' - '=greys7 fillcolor=2];') % (nodename, - node_class_name)) - else: - if colored: - dotlist.append(('%s[label="%s", style=filled,' - ' fillcolor="%s"];') - % (nodename,node_class_name, - colorset[level])) - else: - dotlist.append(('%s[label="%s"];') - % (nodename,node_class_name)) - - for node in nx.topological_sort(self._graph): - if isinstance(node, Workflow): - fullname = '.'.join(hierarchy + [node.fullname]) - nodename = fullname.replace('.', '_') - dotlist.append('subgraph cluster_%s {' % nodename) - if colored: - dotlist.append(prefix + prefix + 'edge [color="%s"];' % (colorset[level+1])) - dotlist.append(prefix + prefix + 'style=filled;') - dotlist.append(prefix + prefix + 'fillcolor="%s";' % (colorset[level+2])) - dotlist.append(node._get_dot(prefix=prefix + prefix, - hierarchy=hierarchy + [self.name], - colored=colored, - simple_form=simple_form, level=level+3)) - dotlist.append('}') - if level==6:level=2 - else: - for subnode in self._graph.successors_iter(node): - if node._hierarchy != subnode._hierarchy: - continue - if not isinstance(subnode, Workflow): - nodefullname = '.'.join(hierarchy + [node.fullname]) - subnodefullname = '.'.join(hierarchy + - [subnode.fullname]) - nodename = nodefullname.replace('.', '_') - subnodename = subnodefullname.replace('.', '_') - for _ in self._graph.get_edge_data(node, - subnode)['connect']: - dotlist.append('%s -> %s;' % (nodename, - subnodename)) - logger.debug('connection: ' + dotlist[-1]) - # add between workflow connections - for u, v, d in self._graph.edges_iter(data=True): - uname = '.'.join(hierarchy + [u.fullname]) - vname = '.'.join(hierarchy + [v.fullname]) - for src, dest in d['connect']: - uname1 = uname - vname1 = vname - if isinstance(src, tuple): - srcname = src[0] - else: - srcname = src - if '.' in srcname: - uname1 += '.' + '.'.join(srcname.split('.')[:-1]) - if '.' in dest and '@' not in dest: - if not isinstance(v, Workflow): - if 'datasink' not in \ - str(v._interface.__class__).lower(): - vname1 += '.' + '.'.join(dest.split('.')[:-1]) - else: - vname1 += '.' + '.'.join(dest.split('.')[:-1]) - if uname1.split('.')[:-1] != vname1.split('.')[:-1]: - dotlist.append('%s -> %s;' % (uname1.replace('.', '_'), - vname1.replace('.', '_'))) - logger.debug('cross connection: ' + dotlist[-1]) - return ('\n' + prefix).join(dotlist) - - class Node(EngineBase): """Wraps interface objects for use in pipeline diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index 193fb0650a..b14d73a307 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -644,7 +644,13 @@ def _write_report_info(self, workingdir, name, graph): value=1)) save_json(graph_file, json_dict) graph_file = op.join(report_dir, 'graph.json') - template = '%%0%dd_' % np.ceil(np.log10(len(nodes))).astype(int) + # Avoid RuntimeWarning: divide by zero encountered in log10 + num_nodes = len(nodes) + if num_nodes > 0: + index_name = np.ceil(np.log10(num_nodes)).astype(int) + else: + index_name = 0 + template = '%%0%dd_' % index_name def getname(u, i): name_parts = u.fullname.split('.') From 6ee17d48131eefa127e301711a50e4fcce09554d Mon Sep 17 00:00:00 2001 From: Chris Gorgolewski Date: Sun, 31 Jan 2016 19:12:04 -0800 Subject: [PATCH 16/22] make FSL course download slightly easier to debug --- circle.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/circle.yml b/circle.yml index 4f0d31d060..f08c1029f3 100644 --- a/circle.yml +++ b/circle.yml @@ -23,8 +23,8 @@ dependencies: - pip install -e . - pip install matplotlib sphinx ipython boto - gem install fakes3 - - if [[ ! -d ~/examples/data ]]; then wget "http://tcpdiag.dl.sourceforge.net/project/nipy/nipype/nipype-0.2/nipype-tutorial.tar.bz2"; tar jxvf nipype-tutorial.tar.bz2; mkdir ~/examples; mv nipype-tutorial/* ~/examples/; fi - - if [[ ! -d ~/examples/fsl_course_data ]]; then wget -c "http://fsl.fmrib.ox.ac.uk/fslcourse/fdt1.tar.gz" ; wget -c "http://fsl.fmrib.ox.ac.uk/fslcourse/fdt2.tar.gz"; wget -c "http://fsl.fmrib.ox.ac.uk/fslcourse/tbss.tar.gz"; mkdir ~/examples/fsl_course_data; tar zxvf fdt1.tar.gz -C ~/examples/fsl_course_data; tar zxvf fdt2.tar.gz -C ~/examples/fsl_course_data; tar zxvf tbss.tar.gz -C ~/examples/fsl_course_data; fi + - if [[ ! -d ~/examples/data ]]; then wget "http://tcpdiag.dl.sourceforge.net/project/nipy/nipype/nipype-0.2/nipype-tutorial.tar.bz2" && tar jxvf nipype-tutorial.tar.bz2 && mkdir ~/examples && mv nipype-tutorial/* ~/examples/; fi + - if [[ ! -d ~/examples/fsl_course_data ]]; then wget -c "http://fsl.fmrib.ox.ac.uk/fslcourse/fdt1.tar.gz" && wget -c "http://fsl.fmrib.ox.ac.uk/fslcourse/fdt2.tar.gz" && wget -c "http://fsl.fmrib.ox.ac.uk/fslcourse/tbss.tar.gz" && mkdir ~/examples/fsl_course_data && tar zxvf fdt1.tar.gz -C ~/examples/fsl_course_data && tar zxvf fdt2.tar.gz -C ~/examples/fsl_course_data && tar zxvf tbss.tar.gz -C ~/examples/fsl_course_data; fi - bash ~/nipype/tools/install_spm_mcr.sh - mkdir -p ~/.nipype && echo '[logging]' > ~/.nipype/nipype.cfg && echo 'workflow_level = DEBUG' >> ~/.nipype/nipype.cfg && echo 'interface_level = DEBUG' >> ~/.nipype/nipype.cfg && echo 'filemanip_level = DEBUG' >> ~/.nipype/nipype.cfg machine: From 0e195b551d5914a15c6193ea9742ea66f9156204 Mon Sep 17 00:00:00 2001 From: Chris Filo Gorgolewski Date: Sun, 31 Jan 2016 19:51:44 -0800 Subject: [PATCH 17/22] fixed mkdir --- circle.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/circle.yml b/circle.yml index f08c1029f3..cbbca57f41 100644 --- a/circle.yml +++ b/circle.yml @@ -23,7 +23,7 @@ dependencies: - pip install -e . - pip install matplotlib sphinx ipython boto - gem install fakes3 - - if [[ ! -d ~/examples/data ]]; then wget "http://tcpdiag.dl.sourceforge.net/project/nipy/nipype/nipype-0.2/nipype-tutorial.tar.bz2" && tar jxvf nipype-tutorial.tar.bz2 && mkdir ~/examples && mv nipype-tutorial/* ~/examples/; fi + - if [[ ! -d ~/examples/data ]]; then wget "http://tcpdiag.dl.sourceforge.net/project/nipy/nipype/nipype-0.2/nipype-tutorial.tar.bz2" && tar jxvf nipype-tutorial.tar.bz2 && mv nipype-tutorial/* ~/examples/; fi - if [[ ! -d ~/examples/fsl_course_data ]]; then wget -c "http://fsl.fmrib.ox.ac.uk/fslcourse/fdt1.tar.gz" && wget -c "http://fsl.fmrib.ox.ac.uk/fslcourse/fdt2.tar.gz" && wget -c "http://fsl.fmrib.ox.ac.uk/fslcourse/tbss.tar.gz" && mkdir ~/examples/fsl_course_data && tar zxvf fdt1.tar.gz -C ~/examples/fsl_course_data && tar zxvf fdt2.tar.gz -C ~/examples/fsl_course_data && tar zxvf tbss.tar.gz -C ~/examples/fsl_course_data; fi - bash ~/nipype/tools/install_spm_mcr.sh - mkdir -p ~/.nipype && echo '[logging]' > ~/.nipype/nipype.cfg && echo 'workflow_level = DEBUG' >> ~/.nipype/nipype.cfg && echo 'interface_level = DEBUG' >> ~/.nipype/nipype.cfg && echo 'filemanip_level = DEBUG' >> ~/.nipype/nipype.cfg From 333122ea18fa1411932f0bb9553acbd39ec64aef Mon Sep 17 00:00:00 2001 From: Alexandre Manhaes Savio Date: Thu, 28 Jan 2016 13:46:24 +0100 Subject: [PATCH 18/22] enh: add PETPVC wrapper --- nipype/interfaces/petpvc.py | 235 ++++++++++++++++++++++++++++++++++++ 1 file changed, 235 insertions(+) create mode 100644 nipype/interfaces/petpvc.py diff --git a/nipype/interfaces/petpvc.py b/nipype/interfaces/petpvc.py new file mode 100644 index 0000000000..519282491a --- /dev/null +++ b/nipype/interfaces/petpvc.py @@ -0,0 +1,235 @@ +# -*- coding: utf-8 -*- +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +""" +Nipype interface for PETPVC. + +PETPVC is a software from the Nuclear Medicine Department +of the UCL University Hospital, London, UK. + +Its source code is here: https://github.com/UCL/PETPVC + +The methods that it implement are explained here: +K. Erlandsson, I. Buvat, P. H. Pretorius, B. A. Thomas, and B. F. Hutton, +“A review of partial volume correction techniques for emission tomography +and their applications in neurology, cardiology and oncology,” Phys. Med. +Biol., vol. 57, no. 21, p. R119, 2012. + +There is a publication waiting to be accepted for this software tool. + + +Its command line help shows this: + + -i --input < filename > + = PET image file + -o --output < filename > + = Output file + [ -m --mask < filename > ] + = Mask image file + -p --pvc < keyword > + = Desired PVC method + -x < X > + = The full-width at half maximum in mm along x-axis + -y < Y > + = The full-width at half maximum in mm along y-axis + -z < Z > + = The full-width at half maximum in mm along z-axis + [ -d --debug ] + = Prints debug information + [ -n --iter [ Val ] ] + = Number of iterations + With: Val (Default = 10) + [ -k [ Val ] ] + = Number of deconvolution iterations + With: Val (Default = 10) + [ -a --alpha [ aval ] ] + = Alpha value + With: aval (Default = 1.5) + [ -s --stop [ stopval ] ] + = Stopping criterion + With: stopval (Default = 0.01) + +---------------------------------------------- +Technique - keyword + +Geometric transfer matrix - "GTM" +Labbe approach - "LABBE" +Richardson-Lucy - "RL" +Van-Cittert - "VC" +Region-based voxel-wise correction - "RBV" +RBV with Labbe - "LABBE+RBV" +RBV with Van-Cittert - "RBV+VC" +RBV with Richardson-Lucy - "RBV+RL" +RBV with Labbe and Van-Cittert - "LABBE+RBV+VC" +RBV with Labbe and Richardson-Lucy- "LABBE+RBV+RL" +Multi-target correction - "MTC" +MTC with Labbe - "LABBE+MTC" +MTC with Van-Cittert - "MTC+VC" +MTC with Richardson-Lucy - "MTC+RL" +MTC with Labbe and Van-Cittert - "LABBE+MTC+VC" +MTC with Labbe and Richardson-Lucy- "LABBE+MTC+RL" +Iterative Yang - "IY" +Iterative Yang with Van-Cittert - "IY+VC" +Iterative Yang with Richardson-Lucy - "IY+RL" +Muller Gartner - "MG" +Muller Gartner with Van-Cittert - "MG+VC" +Muller Gartner with Richardson-Lucy - "MG+RL" + +""" +from __future__ import print_function +from __future__ import division + +import os +import warnings + +from nipype.interfaces.base import ( + TraitedSpec, + CommandLineInputSpec, + CommandLine, + File, + isdefined, + traits, +) + +warn = warnings.warn + +pvc_methods = ['GTM', + 'IY', + 'IY+RL', + 'IY+VC', + 'LABBE', + 'LABBE+MTC', + 'LABBE+MTC+RL', + 'LABBE+MTC+VC', + 'LABBE+RBV', + 'LABBE+RBV+RL', + 'LABBE+RBV+VC', + 'MG', + 'MG+RL', + 'MG+VC', + 'MTC', + 'MTC+RL', + 'MTC+VC', + 'RBV', + 'RBV+RL', + 'RBV+VC', + 'RL', + 'VC'] + + +class PETPVCInputSpec(CommandLineInputSpec): + in_file = File(desc="PET image file", exists=True, mandatory=True, argstr="-i %s") + out_file = File(desc="Output file", genfile=True, hash_files=False, argstr="-o %s") + mask_file = File(desc="Mask image file", exists=True, mandatory=True, argstr="-m %s") + pvc = traits.Enum(pvc_methods, desc="Desired PVC method", mandatory=True, argstr="-p %s") + fwhm_x = traits.Float(desc="The full-width at half maximum in mm along x-axis", mandatory=True, argstr="-x %.4f") + fwhm_y = traits.Float(desc="The full-width at half maximum in mm along y-axis", mandatory=True, argstr="-y %.4f") + fwhm_z = traits.Float(desc="The full-width at half maximum in mm along z-axis", mandatory=True, argstr="-z %.4f") + debug = traits.Bool (desc="Prints debug information", usedefault=True, default_value=False, argstr="-d") + n_iter = traits.Int (desc="Number of iterations", default_value=10, argstr="-n %d") + n_deconv = traits.Int (desc="Number of deconvolution iterations", default_value=10, argstr="-k %d") + alpha = traits.Float(desc="Alpha value", default_value=1.5, argstr="-a %.4f") + stop_crit = traits.Float(desc="Stopping criterion", default_value=0.01, argstr="-a %.4f") + + +class PETPVCOutputSpec(TraitedSpec): + out_file = File(desc = "Output file") + + +class PETPVC(CommandLine): + """ Use PETPVC for partial volume correction of PET images. + + Examples + -------- + >>> from ..testing import example_data + >>> #TODO get data for PETPVC + >>> pvc = PETPVC() + >>> pvc.inputs.in_file = example_data('pet.nii.gz') + >>> pvc.inputs.mask_file = example_data('tissues.nii.gz') + >>> pvc.inputs.out_file = 'pet_pvc_rbv.nii.gz' + >>> pvc.inputs.pvc = 'RBV' + >>> pvc.inputs.fwhm_x = 2.0 + >>> pvc.inputs.fwhm_y = 2.0 + >>> pvc.inputs.fwhm_z = 2.0 + >>> outs = pvc.run() #doctest: +SKIP + """ + input_spec = PETPVCInputSpec + output_spec = PETPVCOutputSpec + _cmd = 'petpvc' + + def _list_outputs(self): + outputs = self.output_spec().get() + outputs['out_file'] = self.inputs.out_file + if not isdefined(outputs['out_file']): + method_name = self.inputs.pvc.lower() + outputs['out_file'] = self._gen_fname(self.inputs.in_file, + suffix='_{}_pvc'.format(method_name)) + + outputs['out_file'] = os.path.abspath(outputs['out_file']) + return outputs + + def _gen_fname(self, basename, cwd=None, suffix=None, change_ext=True, + ext='.nii.gz'): + """Generate a filename based on the given parameters. + + The filename will take the form: cwd/basename. + If change_ext is True, it will use the extentions specified in + intputs.output_type. + + Parameters + ---------- + basename : str + Filename to base the new filename on. + cwd : str + Path to prefix to the new filename. (default is os.getcwd()) + suffix : str + Suffix to add to the `basename`. (defaults is '' ) + change_ext : bool + Flag to change the filename extension to the given `ext`. + (Default is False) + + Returns + ------- + fname : str + New filename based on given parameters. + + """ + from nipype.utils.filemanip import fname_presuffix + + if basename == '': + msg = 'Unable to generate filename for command %s. ' % self.cmd + msg += 'basename is not set!' + raise ValueError(msg) + if cwd is None: + cwd = os.getcwd() + if change_ext: + if suffix: + suffix = ''.join((suffix, ext)) + else: + suffix = ext + if suffix is None: + suffix = '' + fname = fname_presuffix(basename, suffix=suffix, + use_ext=False, newpath=cwd) + return fname + + def _gen_filename(self, name): + if name == 'out_file': + return self._list_outputs()['out_file'] + return None + + +if __name__ == '__main__': + + #from .testing import example_data + #TODO get data for PETPVC + + pvc = PETPVC() + pvc.inputs.in_file = example_data('pet.nii.gz') + pvc.inputs.mask_file = example_data('tissues.nii.gz') + pvc.inputs.out_file = 'pet_pvc_rbv.nii.gz' + pvc.inputs.pvc = 'RBV' + pvc.inputs.fwhm_x = 2.0 + pvc.inputs.fwhm_y = 2.0 + pvc.inputs.fwhm_z = 2.0 + pvc.run() From 248b38febcf75fa6a737ea11566e903a56f37a20 Mon Sep 17 00:00:00 2001 From: Alexandre Manhaes Savio Date: Thu, 28 Jan 2016 17:49:41 +0100 Subject: [PATCH 19/22] Add empty petpvc example files --- nipype/interfaces/petpvc.py | 25 ++++++------------------- nipype/testing/data/pet.nii.gz | 0 nipype/testing/data/tissues.nii.gz | 0 3 files changed, 6 insertions(+), 19 deletions(-) create mode 100644 nipype/testing/data/pet.nii.gz create mode 100644 nipype/testing/data/tissues.nii.gz diff --git a/nipype/interfaces/petpvc.py b/nipype/interfaces/petpvc.py index 519282491a..898624a48d 100644 --- a/nipype/interfaces/petpvc.py +++ b/nipype/interfaces/petpvc.py @@ -1,7 +1,10 @@ -# -*- coding: utf-8 -*- -# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- -# vi: set ft=python sts=4 ts=4 sw=4 et: """ + Change directory to provide relative paths for doctests + >>> import os + >>> filepath = os.path.dirname( os.path.realpath( __file__ ) ) + >>> datadir = os.path.realpath(os.path.join(filepath, '../testing/data')) + >>> os.chdir(datadir) + Nipype interface for PETPVC. PETPVC is a software from the Nuclear Medicine Department @@ -217,19 +220,3 @@ def _gen_filename(self, name): if name == 'out_file': return self._list_outputs()['out_file'] return None - - -if __name__ == '__main__': - - #from .testing import example_data - #TODO get data for PETPVC - - pvc = PETPVC() - pvc.inputs.in_file = example_data('pet.nii.gz') - pvc.inputs.mask_file = example_data('tissues.nii.gz') - pvc.inputs.out_file = 'pet_pvc_rbv.nii.gz' - pvc.inputs.pvc = 'RBV' - pvc.inputs.fwhm_x = 2.0 - pvc.inputs.fwhm_y = 2.0 - pvc.inputs.fwhm_z = 2.0 - pvc.run() diff --git a/nipype/testing/data/pet.nii.gz b/nipype/testing/data/pet.nii.gz new file mode 100644 index 0000000000..e69de29bb2 diff --git a/nipype/testing/data/tissues.nii.gz b/nipype/testing/data/tissues.nii.gz new file mode 100644 index 0000000000..e69de29bb2 From dbd86a827e3d7541abfc19a0d4fe1654b915f21d Mon Sep 17 00:00:00 2001 From: Alexandre Manhaes Savio Date: Thu, 28 Jan 2016 17:51:16 +0100 Subject: [PATCH 20/22] fix: drop example_data from petpvc --- nipype/interfaces/petpvc.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nipype/interfaces/petpvc.py b/nipype/interfaces/petpvc.py index 898624a48d..bd79f32c8e 100644 --- a/nipype/interfaces/petpvc.py +++ b/nipype/interfaces/petpvc.py @@ -147,8 +147,8 @@ class PETPVC(CommandLine): >>> from ..testing import example_data >>> #TODO get data for PETPVC >>> pvc = PETPVC() - >>> pvc.inputs.in_file = example_data('pet.nii.gz') - >>> pvc.inputs.mask_file = example_data('tissues.nii.gz') + >>> pvc.inputs.in_file = 'pet.nii.gz' + >>> pvc.inputs.mask_file = 'tissues.nii.gz' >>> pvc.inputs.out_file = 'pet_pvc_rbv.nii.gz' >>> pvc.inputs.pvc = 'RBV' >>> pvc.inputs.fwhm_x = 2.0 From 435004cebce00510db3bf36ae21b2cbf37020f32 Mon Sep 17 00:00:00 2001 From: Alexandre Manhaes Savio Date: Tue, 2 Feb 2016 17:43:39 +0100 Subject: [PATCH 21/22] add petpvc specs file --- nipype/interfaces/tests/test_auto_PETPVC.py | 65 +++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 nipype/interfaces/tests/test_auto_PETPVC.py diff --git a/nipype/interfaces/tests/test_auto_PETPVC.py b/nipype/interfaces/tests/test_auto_PETPVC.py new file mode 100644 index 0000000000..67c02c72b0 --- /dev/null +++ b/nipype/interfaces/tests/test_auto_PETPVC.py @@ -0,0 +1,65 @@ +# AUTO-GENERATED by tools/checkspecs.py - DO NOT EDIT +from ...testing import assert_equal +from ..petpvc import PETPVC + + +def test_PETPVC_inputs(): + input_map = dict(alpha=dict(argstr='-a %.4f', + ), + args=dict(argstr='%s', + ), + debug=dict(argstr='-d', + usedefault=True, + ), + environ=dict(nohash=True, + usedefault=True, + ), + fwhm_x=dict(argstr='-x %.4f', + mandatory=True, + ), + fwhm_y=dict(argstr='-y %.4f', + mandatory=True, + ), + fwhm_z=dict(argstr='-z %.4f', + mandatory=True, + ), + ignore_exception=dict(nohash=True, + usedefault=True, + ), + in_file=dict(argstr='-i %s', + mandatory=True, + ), + mask_file=dict(argstr='-m %s', + mandatory=True, + ), + n_deconv=dict(argstr='-k %d', + ), + n_iter=dict(argstr='-n %d', + ), + out_file=dict(argstr='-o %s', + genfile=True, + hash_files=False, + ), + pvc=dict(argstr='-p %s', + mandatory=True, + ), + stop_crit=dict(argstr='-a %.4f', + ), + terminal_output=dict(nohash=True, + ), + ) + inputs = PETPVC.input_spec() + + for key, metadata in list(input_map.items()): + for metakey, value in list(metadata.items()): + yield assert_equal, getattr(inputs.traits()[key], metakey), value + + +def test_PETPVC_outputs(): + output_map = dict(out_file=dict(), + ) + outputs = PETPVC.output_spec() + + for key, metadata in list(output_map.items()): + for metakey, value in list(metadata.items()): + yield assert_equal, getattr(outputs.traits()[key], metakey), value From a5a857f0023a206818e76c3a2323d33089155549 Mon Sep 17 00:00:00 2001 From: Alexandre Manhaes Savio Date: Wed, 3 Feb 2016 13:31:04 +0100 Subject: [PATCH 22/22] fix non-ascii chars in petpvc.py --- nipype/interfaces/petpvc.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/nipype/interfaces/petpvc.py b/nipype/interfaces/petpvc.py index bd79f32c8e..00dbbfb3ea 100644 --- a/nipype/interfaces/petpvc.py +++ b/nipype/interfaces/petpvc.py @@ -1,3 +1,5 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: """ Change directory to provide relative paths for doctests >>> import os @@ -14,8 +16,8 @@ The methods that it implement are explained here: K. Erlandsson, I. Buvat, P. H. Pretorius, B. A. Thomas, and B. F. Hutton, -“A review of partial volume correction techniques for emission tomography -and their applications in neurology, cardiology and oncology,” Phys. Med. +"A review of partial volume correction techniques for emission tomography +and their applications in neurology, cardiology and oncology," Phys. Med. Biol., vol. 57, no. 21, p. R119, 2012. There is a publication waiting to be accepted for this software tool.