Skip to content

[WIP,ENH] Conditional Nodes and Workflows #1299

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 62 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
0fb1349
added ConditionalNode
oesteban Dec 14, 2015
a12ab6d
add ConditionalNode import
oesteban Dec 14, 2015
923fba6
simplify doctest for ConditionalNode
oesteban Dec 14, 2015
3fc9845
fix error when importing ConditionalNode
oesteban Dec 14, 2015
080446a
add new CheckInterface
oesteban Dec 14, 2015
0af3ce0
fixing doctests
oesteban Dec 14, 2015
b18b361
add new documentation file
oesteban Dec 14, 2015
a7ec6bb
initialize documentation for runtime decisions
oesteban Dec 14, 2015
3e98c37
introduce CachedWorkflows in documentation
oesteban Dec 14, 2015
f9a8f1c
get node inputs individually
oesteban Dec 14, 2015
d421182
add CachedWorkflow
oesteban Dec 14, 2015
ce3892e
undo rewrite of _get_inputs
oesteban Dec 14, 2015
9af37c1
an early functional version of CachedWorkflows
oesteban Dec 14, 2015
9e52d59
fix build AttributeError _check_result
oesteban Dec 14, 2015
72a4f51
add ConditionalWorkflow and derived CachedWorkflow from it
oesteban Dec 15, 2015
cdc27d7
final improvements
oesteban Dec 15, 2015
cc75ff7
Merge branch 'fix/CheckInputsLoggingHashDiff' into enh/ControlNodes
oesteban Dec 15, 2015
5094349
update CHANGES
oesteban Dec 15, 2015
b8e7c0c
Merge branch 'master' into enh/ControlNodes
oesteban Dec 15, 2015
b9b7c74
improve error message
oesteban Dec 15, 2015
7013b47
add tests
oesteban Dec 15, 2015
6f3a3aa
improve information reporting failed connection
oesteban Dec 15, 2015
b863fc0
simplifiy conditionalworkflow
oesteban Dec 16, 2015
9440077
Split nodes and workflows in engine
oesteban Dec 16, 2015
993039d
refactoring pipeline.engine
oesteban Dec 16, 2015
57684f2
cleaner implementation of ConditionalNode
oesteban Dec 16, 2015
b246756
refactoring control nodes and workflow
oesteban Dec 17, 2015
f260a88
adding new connection types (data, control)
oesteban Dec 18, 2015
4cb5aaf
fix use of logger before definition
oesteban Dec 18, 2015
1d3f580
Integrating CachedWorkflows in new engine
oesteban Dec 18, 2015
d318a7c
fix tests
oesteban Dec 18, 2015
84ae0f0
fix imports for tests
oesteban Dec 18, 2015
cbf10aa
fix several errors
oesteban Dec 18, 2015
5fc702d
Merge branch 'master' into enh/ControlNodes
oesteban Dec 18, 2015
7f32b1b
Solving too many values to unpack and imports
oesteban Dec 18, 2015
0a6512d
fixing paths and imports ...
oesteban Dec 18, 2015
d055d30
fixing errors and doctests
oesteban Dec 18, 2015
8d83ad7
temporarily disable specific tests
oesteban Dec 18, 2015
7fe023f
fix __init__
oesteban Dec 18, 2015
df5310e
add doctest for disable signal
oesteban Dec 19, 2015
d4d526e
add regression test
oesteban Dec 19, 2015
633705e
add testing nested workflows and disable
oesteban Dec 19, 2015
9fa9fc7
add regression test for CachedWorkflow
oesteban Dec 20, 2015
19e2d58
add new tests, fix workflows
oesteban Dec 20, 2015
ab1c59b
fix doctest
oesteban Dec 20, 2015
2d5728d
fixing logical errors in connect()
oesteban Dec 21, 2015
1d52259
add test case
oesteban Dec 21, 2015
2986f62
fix exception not raised
oesteban Dec 21, 2015
3f8f5f2
still fixing tests
oesteban Dec 21, 2015
0f6615b
fix error checking if workflow contains itself
oesteban Dec 21, 2015
d6894b2
CachedWorkflow test does not crash now
oesteban Dec 21, 2015
3dd5464
use sets to gather workflow inputs
oesteban Dec 21, 2015
d2c322e
report all duplicated connections
oesteban Dec 21, 2015
71edbdb
for some reason these tests would not pass
oesteban Dec 21, 2015
9c10acd
restore old tests, several fixes
oesteban Dec 22, 2015
c75788f
fix last test failing
oesteban Dec 22, 2015
d62950b
update documentation, fix error in circleci
oesteban Dec 22, 2015
a84fe07
propagate signals first
oesteban Dec 22, 2015
1726c85
Add signals to report
oesteban Dec 23, 2015
65378f3
add _control attr to fix race condition in nesting like
oesteban Dec 23, 2015
308f568
fix writting signals to report
oesteban Dec 23, 2015
010e6da
Merge branch 'master' into enh/ControlNodes
oesteban Dec 26, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
Next release
============

* ENH: Introduced runtime decisions (https://github.com/nipy/nipype/pull/1299)
* ENH: Provides a Nipype wrapper for ANTs DenoiseImage (https://github.com/nipy/nipype/pull/1291)
* FIX: Minor bugfix logging hash differences (https://github.com/nipy/nipype/pull/1298)
* FIX: Use released Prov python library (https://github.com/nipy/nipype/pull/1279)
Expand Down
1 change: 1 addition & 0 deletions doc/users/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
function_interface
mapnode_and_iterables
joinnode_and_itersource
runtime_decisions
model_specification
saving_workflows
spmmcr
Expand Down
120 changes: 120 additions & 0 deletions doc/users/runtime_decisions.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
.. runtime_decisions:

===========================
Runtime decisions in nipype
===========================

Adding conditional execution (https://github.com/nipy/nipype/issues/878)
other runtime decisions (https://github.com/nipy/nipype/issues/819) in
nipype is an old request. Here we introduce some logic and signalling into
the workflows.

Disable signal in nodes
=======================

The :class:`nipype.pipeline.engine.Node` now provides a `signals` attribute
with a `disable` signal by default.
When the `run()` member of a node is called, the interface will run
normally *iff* `disable` is `False` (default case).

Example
-------

For instance, the following code will run the BET interface from fsl:

>>> from nipype.pipeline.engine import Node
>>> from nipype.interfaces import fsl
>>> bet = Node(fsl.BET(), 'BET')
>>> bet.inputs.in_file = 'T1.nii'
>>> bet.run() # doctest: +SKIP

However, if we set the disable signal, then the interface is not run.

>>> bet.signals.disable = True
>>> bet.run() is None
True

Disable signal in Workflow
==========================

:class:`nipype.pipeline.engine.Workflow` also provides signals, including
`disable` by default.
It is also allowed to connect the output of a node to a signal in a workflow,
using the `signalnode.<name-of-signal>` port.


Example
-------

>>> from nipype.pipeline import engine as pe
>>> from nipype.interfaces import utility as niu
>>> def _myfunc(val):
... return val + 1
>>> wf = pe.Workflow('TestDisableWorkflow')
>>> inputnode = pe.Node(niu.IdentityInterface(
... fields=['in_value']), 'inputnode')
>>> outputnode = pe.Node(niu.IdentityInterface(
... fields=['out_value']), 'outputnode')
>>> func = pe.Node(niu.Function(
... input_names=['val'], output_names=['out'],
... function=_myfunc), 'functionnode')
>>> wf.connect([
... (inputnode, func, [('in_value', 'val')]),
... (ifset, outputnode, [('out', 'out_value')])
... ])
>>> wf.inputs.inputnode.in_value = 0
>>> wf.run() # Will produce 1 in outputnode.out_value

The workflow can be disabled:

>>> wf.signals.disabled = True
>>> wf.run() # The outputnode.out_value remains <undefined>


CachedWorkflow
==============

The :class:`nipype.pipeline.engine.CachedWorkflow` is a type of workflow
that implements a conditional workflow that is executed *iff* the set of
cached inputs is not set.
More precisely, this workflow is able to decide whether its nodes should
be executed or not if all the inputs of the input node called `cachenode`
are set.
For instance, in https://github.com/nipy/nipype/pull/1081 this feature
is requested.
The implementation makes use of :class:`nipype.interfaces.utility.CheckInterface`
which produces an output `out` set to `True` if any/all the inputs are defined
and `False` otherwise.
The input `operation` allows to switch between the any and all conditions.


Example
-------

>>> from nipype.pipeline import engine as pe
>>> from nipype.interfaces import utility as niu
>>> def _myfunc(a, b):
... return a + b
>>> wf = pe.CachedWorkflow('InnerWorkflow',
... cache_map=('c', 'out'))
>>> inputnode = pe.Node(niu.IdentityInterface(
... fields=['a', 'b']), 'inputnode')
>>> func = pe.Node(niu.Function(
... input_names=['a', 'b'], output_names=['out'],
... function=_myfunc), 'functionnode')
>>> wf.connect([
... (inputnode, func, [('a', 'a'), ('b', 'b')]),
... (func, 'output', [('out', 'out')])
... ])
>>> wf.inputs.inputnode.a = 2
>>> wf.inputs.inputnode.b = 3
>>> wf.run() # Will generate 5 in outputnode.out

Please note that the output node should be referred to as 'output' in
the *connect()* call.

If we set all the inputs of the cache, then the workflow is skipped and
the output is mapped from the cache:

>>> wf.inputs.cachenode.c = 7
>>> wf.run() # Will produce 7 in outputnode.out
2 changes: 1 addition & 1 deletion nipype/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,6 @@ def _test_local_install():
pass


from .pipeline import Node, MapNode, JoinNode, Workflow
from .pipeline import engine
from .interfaces import (DataGrabber, DataSink, SelectFiles,
IdentityInterface, Rename, Function, Select, Merge)
2 changes: 1 addition & 1 deletion nipype/caching/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from ..interfaces.base import BaseInterface
from ..pipeline.engine import Node
from ..pipeline.utils import modify_paths
from ..pipeline.engine.utils import modify_paths

################################################################################
# PipeFunc object: callable interface to nipype.interface objects
Expand Down
2 changes: 1 addition & 1 deletion nipype/caching/tests/test_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from nose.tools import assert_equal

from .. import Memory
from ...pipeline.tests.test_engine import TestInterface
from ...pipeline.engine.tests.test_engine import TestInterface

from ... import config
config.set_default_config()
Expand Down
7 changes: 4 additions & 3 deletions nipype/interfaces/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@
TraitListObject, TraitError,
isdefined, File, Directory,
has_metadata)
from ..utils.filemanip import (md5, hash_infile, FileNotFoundError,
hash_timestamp, save_json,
split_filename)
from ..utils.filemanip import md5, \
FileNotFoundError, save_json, split_filename
from ..utils.misc import is_container, trim, str2bool
from ..utils.provenance import write_provenance
from .. import config, logging, LooseVersion
Expand Down Expand Up @@ -463,6 +462,7 @@ def _deprecated_warn(self, obj, name, old, new):

def _hash_infile(self, adict, key):
""" Inject file hashes into adict[key]"""
from nipype.utils.filemanip import hash_infile, hash_timestamp
stuff = adict[key]
if not is_container(stuff):
stuff = [stuff]
Expand Down Expand Up @@ -578,6 +578,7 @@ def get_hashval(self, hash_method=None):

def _get_sorteddict(self, object, dictwithhash=False, hash_method=None,
hash_files=True):
from nipype.utils.filemanip import hash_infile, hash_timestamp
if isinstance(object, dict):
out = []
for key, val in sorted(object.items()):
Expand Down
3 changes: 2 additions & 1 deletion nipype/interfaces/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,8 @@ class SelectFiles(IOBase):
--------

>>> import pprint
>>> from nipype import SelectFiles, Node
>>> from nipype.pipeline.engine import Node
>>> from nipype.interfaces.io import SelectFiles
>>> templates={"T1": "{subject_id}/struct/T1.nii",
... "epi": "{subject_id}/func/f[0, 1].nii"}
>>> dg = Node(SelectFiles(templates), "selectfiles")
Expand Down
81 changes: 81 additions & 0 deletions nipype/interfaces/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,3 +566,84 @@ def _list_outputs(self):
entry = self._parse_line(line)
outputs = self._append_entry(outputs, entry)
return outputs


class CheckInterfaceOutputSpec(TraitedSpec):
out = traits.Bool(False, desc='Inputs meet condition')


class CheckInterface(IOBase):
"""
Interface that performs checks on inputs

Examples
--------

>>> from nipype.interfaces.utility import CheckInterface
>>> checkif = CheckInterface(fields=['a', 'b'], operation='any')
>>> checkif._list_outputs()['out']
False

>>> checkif.inputs.a = 'foo'
>>> out = checkif.run()
>>> checkif._list_outputs()['out']
True

>>> checkif.inputs.operation = 'all'
>>> out = checkif.run()
>>> checkif._list_outputs()['out']
False

>>> checkif.inputs.b = 'bar'
>>> out = checkif.run()
>>> checkif._list_outputs()['out']
True
"""
input_spec = DynamicTraitedSpec
output_spec = CheckInterfaceOutputSpec
_always_run = True

def __init__(self, fields=None, operation='all', **inputs):
super(CheckInterface, self).__init__(**inputs)

if fields is None or not fields:
raise ValueError('CheckInterface fields must be a non-empty '
'list')

if 'operation' in fields:
raise ValueError('CheckInterface does not allow fields using'
' special name \'operation\'')
# Each input must be in the fields.
for in_field in inputs:
if in_field not in fields:
raise ValueError('CheckInterface input is not in the '
'fields: %s' % in_field)
self._fields = fields
add_traits(self.inputs, fields + ['operation'])

# Adding any traits wipes out all input values set in superclass initialization,
# even it the trait is not in the add_traits argument. The work-around is to reset
# the values after adding the traits.
self.inputs.set(**inputs)

if operation not in ['all', 'any']:
raise ValueError('CheckInterface does not accept key word '
'\'%s\' as operation input' % operation)
self.inputs.operation = operation

def _check_result(self):
if self.inputs.operation not in ['all', 'any']:
raise ValueError('CheckInterface does not accept keyword '
'\'%s\' as operation input' % operation)

results = [isdefined(getattr(self.inputs, key))
for key in self._fields if key != 'operation']

if self.inputs.operation == 'any':
return any(results)
return all(results)

def _list_outputs(self):
outputs = self._outputs().get()
outputs['out'] = self._check_result()
return outputs
3 changes: 2 additions & 1 deletion nipype/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@

from __future__ import absolute_import
__docformat__ = 'restructuredtext'
from .engine import Node, MapNode, JoinNode, Workflow

from .engine import *
7 changes: 7 additions & 0 deletions nipype/pipeline/engine/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:

from .workflows import *
from .nodes import *
Loading