Skip to content

[WIP] adding mapping and submitter to enh/workflow syntax #2638

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions nipype/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
class NipypeError(Exception):
pass


class EngineError(Exception):
pass


class PipelineError(NipypeError):
pass


class NodeError(EngineError):
pass


class WorkflowError(NodeError):
pass


class MappingError(NodeError):
pass


class JoinError(NodeError):
pass


class InterfaceError(NipypeError):
pass
1 change: 1 addition & 0 deletions nipype/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def get_nipype_gitversion():
'pydot>=%s' % PYDOT_MIN_VERSION,
'packaging',
'futures; python_version == "2.7"',
'dask',
]

if sys.version_info <= (3, 4):
Expand Down
2 changes: 1 addition & 1 deletion nipype/pipeline/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@

from __future__ import absolute_import
__docformat__ = 'restructuredtext'
from .workflows import Workflow
from .workflows import Workflow, NewNode, NewWorkflow
from .nodes import Node, MapNode, JoinNode
from .utils import generate_expanded_graph
206 changes: 206 additions & 0 deletions nipype/pipeline/engine/auxiliary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
import pdb
import inspect
from ... import config, logging
logger = logging.getLogger('nipype.workflow')


# Function to change user provided mapper to "reverse polish notation" used in State
def mapper2rpn(mapper):
""" Functions that translate mapper to "reverse polish notation."""
global output_mapper
output_mapper = []
_ordering(mapper, i=0)
return output_mapper


def _ordering(el, i, current_sign=None):
""" Used in the mapper2rpn to get a proper order of fields and signs. """
global output_mapper
if type(el) is tuple:
_iterate_list(el, ".")
elif type(el) is list:
_iterate_list(el, "*")
elif type(el) is str:
output_mapper.append(el)
else:
raise Exception("mapper has to be a string, a tuple or a list")

if i > 0:
output_mapper.append(current_sign)


def _iterate_list(element, sign):
""" Used in the mapper2rpn to get recursion. """
for i, el in enumerate(element):
_ordering(el, i, current_sign=sign)


# functions used in State to know which element should be used for a specific axis

def mapping_axis(state_inputs, mapper_rpn):
"""Having inputs and mapper (in rpn notation), functions returns the axes of output for every input."""
axis_for_input = {}
stack = []
current_axis = None
current_shape = None

for el in mapper_rpn:
if el == ".":
right = stack.pop()
left = stack.pop()
if left == "OUT":
if state_inputs[right].shape == current_shape: #todo:should we allow for one-element array?
axis_for_input[right] = current_axis
else:
raise Exception("arrays for scalar operations should have the same size")

elif right == "OUT":
if state_inputs[left].shape == current_shape:
axis_for_input[left] = current_axis
else:
raise Exception("arrays for scalar operations should have the same size")

else:
if state_inputs[right].shape == state_inputs[left].shape:
current_axis = list(range(state_inputs[right].ndim))
current_shape = state_inputs[left].shape
axis_for_input[left] = current_axis
axis_for_input[right] = current_axis
else:
raise Exception("arrays for scalar operations should have the same size")

stack.append("OUT")

elif el == "*":
right = stack.pop()
left = stack.pop()
if left == "OUT":
axis_for_input[right] = [i + 1 + current_axis[-1]
for i in range(state_inputs[right].ndim)]
current_axis = current_axis + axis_for_input[right]
current_shape = tuple([i for i in current_shape + state_inputs[right].shape])
elif right == "OUT":
for key in axis_for_input:
axis_for_input[key] = [i + state_inputs[left].ndim
for i in axis_for_input[key]]

axis_for_input[left] = [i - len(current_shape) + current_axis[-1] + 1
for i in range(state_inputs[left].ndim)]
current_axis = current_axis + [i + 1 + current_axis[-1]
for i in range(state_inputs[left].ndim)]
current_shape = tuple([i for i in state_inputs[left].shape + current_shape])
else:
axis_for_input[left] = list(range(state_inputs[left].ndim))
axis_for_input[right] = [i + state_inputs[left].ndim
for i in range(state_inputs[right].ndim)]
current_axis = axis_for_input[left] + axis_for_input[right]
current_shape = tuple([i for i in
state_inputs[left].shape + state_inputs[right].shape])
stack.append("OUT")

else:
stack.append(el)

if len(stack) == 0:
pass
elif len(stack) > 1:
raise Exception("exception from mapping_axis")
elif stack[0] != "OUT":
current_axis = [i for i in range(state_inputs[stack[0]].ndim)]
axis_for_input[stack[0]] = current_axis

if current_axis:
ndim = max(current_axis) + 1
else:
ndim = 0
return axis_for_input, ndim


def converting_axis2input(state_inputs, axis_for_input, ndim):
""" Having axes for all the input fields, the function returns fields for each axis. """
input_for_axis = []
shape = []
for i in range(ndim):
input_for_axis.append([])
shape.append(0)

for inp, axis in axis_for_input.items():
for (i, ax) in enumerate(axis):
input_for_axis[ax].append(inp)
shape[ax] = state_inputs[inp].shape[i]

return input_for_axis, shape


# used in the Node to change names in a mapper

def change_mapper(mapper, name):
"""changing names of mapper: adding names of the node"""
if isinstance(mapper, str):
if "-" in mapper:
return mapper
else:
return "{}-{}".format(name, mapper)
elif isinstance(mapper, list):
return _add_name(mapper, name)
elif isinstance(mapper, tuple):
mapper_l = list(mapper)
return tuple(_add_name(mapper_l, name))


def _add_name(mlist, name):
for i, elem in enumerate(mlist):
if isinstance(elem, str):
if "-" in elem:
pass
else:
mlist[i] = "{}-{}".format(name, mlist[i])
elif isinstance(elem, list):
mlist[i] = _add_name(elem, name)
elif isinstance(elem, tuple):
mlist[i] = list(elem)
mlist[i] = _add_name(mlist[i], name)
mlist[i] = tuple(mlist[i])
return mlist


#Function interface

class Function_Interface(object):
""" A new function interface """
def __init__(self, function, output_nm, input_map=None):
self.function = function
if type(output_nm) is list:
self._output_nm = output_nm
else:
raise Exception("output_nm should be a list")
if not input_map:
self.input_map = {}
# TODO use signature
for key in inspect.getargspec(function)[0]:
if key not in self.input_map.keys():
self.input_map[key] = key


def run(self, input):
self.output = {}
if self.input_map:
for (key_fun, key_inp) in self.input_map.items():
try:
input[key_fun] = input.pop(key_inp)
except KeyError:
raise Exception("no {} in the input dictionary".format(key_inp))
fun_output = self.function(**input)
logger.debug("Function Interf, input={}, fun_out={}".format(input, fun_output))
if type(fun_output) is tuple:
if len(self._output_nm) == len(fun_output):
for i, out in enumerate(fun_output):
self.output[self._output_nm[i]] = out
else:
raise Exception("length of output_nm doesnt match length of the function output")
elif len(self._output_nm)==1:
self.output[self._output_nm[0]] = fun_output
else:
raise Exception("output_nm doesnt match length of the function output")

return fun_output
85 changes: 85 additions & 0 deletions nipype/pipeline/engine/state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from collections import OrderedDict
import pdb

from . import auxiliary as aux

class State(object):
def __init__(self, state_inputs, node_name, mapper=None):
self.state_inputs = state_inputs

self._mapper = mapper
self.node_name = node_name
if self._mapper:
# changing mapper (as in rpn), so I can read from left to right
# e.g. if mapper=('d', ['e', 'r']), _mapper_rpn=['d', 'e', 'r', '*', '.']
self._mapper_rpn = aux.mapper2rpn(self._mapper)
self._input_names_mapper = [i for i in self._mapper_rpn if i not in ["*", "."]]
else:
self._mapper_rpn = []
self._input_names_mapper = []
# not all input field have to be use in the mapper, can be an extra scalar
self._input_names = list(self.state_inputs.keys())

# dictionary[key=input names] = list of axes related to
# e.g. {'r': [1], 'e': [0], 'd': [0, 1]}
# ndim - int, number of dimension for the "final array" (that is not created)
self._axis_for_input, self._ndim = aux.mapping_axis(self.state_inputs, self._mapper_rpn)

# list of inputs variable for each axis
# e.g. [['e', 'd'], ['r', 'd']]
# shape - list, e.g. [2,3]
self._input_for_axis, self._shape = aux.converting_axis2input(self.state_inputs,
self._axis_for_input, self._ndim)

# list of all possible indexes in each dim, will be use to iterate
# e.g. [[0, 1], [0, 1, 2]]
self._all_elements = [range(i) for i in self._shape]


def __getitem__(self, key):
if type(key) is int:
key = (key,)
return self.state_values(key)

@property
def all_elements(self):
return self._all_elements

# not used?
#@property
#def mapper(self):
# return self._mapper


@property
def ndim(self):
return self._ndim


@property
def shape(self):
return self._shape


def state_values(self, ind):
if len(ind) > self._ndim:
raise IndexError("too many indices")

for ii, index in enumerate(ind):
if index > self._shape[ii] - 1:
raise IndexError("index {} is out of bounds for axis {} with size {}".format(index, ii, self._shape[ii]))

state_dict = {}
for input, ax in self._axis_for_input.items():
# checking which axes are important for the input
sl_ax = slice(ax[0], ax[-1]+1)
# taking the indexes for the axes
ind_inp = tuple(ind[sl_ax]) #used to be list
state_dict[input] = self.state_inputs[input][ind_inp]
# adding values from input that are not used in the mapper
for input in set(self._input_names) - set(self._input_names_mapper):
state_dict[input] = self.state_inputs[input]

# in py3.7 we can skip OrderedDict
# returning a named tuple?
return OrderedDict(sorted(state_dict.items(), key=lambda t: t[0]))
Loading