From f8e460a7358e2773fcf40e52ac10cf9ebb33177c Mon Sep 17 00:00:00 2001 From: "Christopher J. Markiewicz" Date: Mon, 2 Jul 2018 11:24:47 -0400 Subject: [PATCH 01/16] ENH: First pass at new workflow syntax --- nipype/pipeline/engine/workflows.py | 109 +++++++++++++++++++++++++++- 1 file changed, 108 insertions(+), 1 deletion(-) diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index a10dabef30..b8d20ffd0c 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -22,6 +22,7 @@ import networkx as nx from ... import config, logging +from ...exceptions import NodeError, WorkflowError, MappingError, JoinError from ...utils.misc import str2bool from ...utils.functions import (getsource, create_function_from_source) @@ -33,7 +34,7 @@ get_print_name, merge_dict, format_node) from .base import EngineBase -from .nodes import MapNode +from .nodes import MapNode, Node # Py2 compat: http://python-future.org/compatible_idioms.html#collections-counter-and-ordereddict from future import standard_library @@ -1043,3 +1044,109 @@ def _get_dot(self, vname1.replace('.', '_'))) logger.debug('cross connection: %s', dotlist[-1]) return ('\n' + prefix).join(dotlist) + + def add(self, name, node_like): + if is_interface(node_like): + node = Node(node_like, name=name) + elif is_node(node_like): + node = node_like + + self.add_nodes([node]) + + +class Map(Node): + pass + + +class Join(Node): + pass + + +class MapState(object): + pass + +class NewNode(EngineBase): + def __init__(self, inputs={}, map_on=None, join_by=None, + *args, **kwargs): + self._mappers = {} + self._joiners = {} + + def map(self, field, values=None): + if isinstance(field, list): + for field_ + if values is not None: + if len(values != len(field)): + elif isinstance(field, tuple): + pass + if values is None: + values = getattr(self._inputs, field) + if values is None: + raise MappingError('Cannot map unassigned input field') + self._mappers[field] = values + + def join(self, field): + pass + + +class NewWorkflow(NewNode): + def __init__(self, inputs={}, *args, **kwargs): + super(NewWorkflow, self).__init__(*args, **kwargs) + + self._nodes = {} + + mro = self.__class__.mro() + wf_klasses = mro[:mro.index(NewWorkflow)][::-1] + items = {} + for klass in wf_klasses: + items.update(klass.__dict__) + for name, runnable in items.items(): + if name in ('__module__', '__doc__'): + continue + + self.add(name, value) + + def add(self, name, runnable): + if is_function(runnable): + node = Node(Function(function=runnable), name=name) + elif is_interface(runnable): + node = Node(runnable, name=name) + elif is_node(runnable): + node = runnable if runnable.name == name else runnable.clone(name=name) + else: + raise ValueError("Unknown workflow element: {!r}".format(runnable)) + setattr(self, name, node) + self._nodes[name] = node + self._last_added = name + + def map(self, field, node=None, values=None): + if node is None: + if '.' in field: + node, field = field.rsplit('.', 1) + else: + node = self._last_added + + if '.' in node: + subwf, node = node.split('.', 1) + self._nodes[subwf].map(field, node, values) + return + + if node in self._mappers: + raise WorkflowError("Cannot assign two mappings to the same input") + + self._mappers[node] = (field, values) + + def join(self, field, node=None): + pass + + +def is_function(obj): + return hasattr(obj, '__call__') + + +def is_interface(obj): + return all(hasattr(obj, protocol) + for protocol in ('input_spec', 'output_spec', 'run')) + + +def is_node(obj): + return hasattr(obj, itername) From a646483de3993ae55678c3a9ef50677ef86409d6 Mon Sep 17 00:00:00 2001 From: "Christopher J. Markiewicz" Date: Mon, 2 Jul 2018 14:38:02 -0400 Subject: [PATCH 02/16] ENH: Add nipype exceptions --- nipype/exceptions.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 nipype/exceptions.py diff --git a/nipype/exceptions.py b/nipype/exceptions.py new file mode 100644 index 0000000000..19880a22ab --- /dev/null +++ b/nipype/exceptions.py @@ -0,0 +1,26 @@ +class NipypeError(Exception): + pass + + +class PipelineError(NipypeError): + pass + + +class NodeError(EngineError): + pass + + +class WorkflowError(NodeError): + pass + + +class MappingError(NodeError): + pass + + +class JoinError(NodeError): + pass + + +class InterfaceError(NipypeError): + pass From f568d782f06c8e6a26f735bd1d4784bc7a70f574 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Wed, 11 Jul 2018 22:55:57 -0400 Subject: [PATCH 03/16] Adding auxiliary and state from nipype2_tmp --- nipype/pipeline/engine/auxiliary.py | 206 ++++++++++++++++++ nipype/pipeline/engine/state.py | 85 ++++++++ .../pipeline/engine/tests/test_auxiliary.py | 73 +++++++ 3 files changed, 364 insertions(+) create mode 100644 nipype/pipeline/engine/auxiliary.py create mode 100644 nipype/pipeline/engine/state.py create mode 100644 nipype/pipeline/engine/tests/test_auxiliary.py diff --git a/nipype/pipeline/engine/auxiliary.py b/nipype/pipeline/engine/auxiliary.py new file mode 100644 index 0000000000..b477801004 --- /dev/null +++ b/nipype/pipeline/engine/auxiliary.py @@ -0,0 +1,206 @@ +import pdb +import inspect +from .. import config, logging +logger = logging.getLogger('workflow') + + +# Function to change user provided mapper to "reverse polish notation" used in State +def mapper2rpn(mapper): + """ Functions that translate mapper to "reverse polish notation.""" + global output_mapper + output_mapper = [] + _ordering(mapper, i=0) + return output_mapper + + +def _ordering(el, i, current_sign=None): + """ Used in the mapper2rpn to get a proper order of fields and signs. """ + global output_mapper + if type(el) is tuple: + _iterate_list(el, ".") + elif type(el) is list: + _iterate_list(el, "*") + elif type(el) is str: + output_mapper.append(el) + else: + raise Exception("mapper has to be a string, a tuple or a list") + + if i > 0: + output_mapper.append(current_sign) + + +def _iterate_list(element, sign): + """ Used in the mapper2rpn to get recursion. """ + for i, el in enumerate(element): + _ordering(el, i, current_sign=sign) + + +# functions used in State to know which element should be used for a specific axis + +def mapping_axis(state_inputs, mapper_rpn): + """Having inputs and mapper (in rpn notation), functions returns the axes of output for every input.""" + axis_for_input = {} + stack = [] + current_axis = None + current_shape = None + + for el in mapper_rpn: + if el == ".": + right = stack.pop() + left = stack.pop() + if left == "OUT": + if state_inputs[right].shape == current_shape: #todo:should we allow for one-element array? + axis_for_input[right] = current_axis + else: + raise Exception("arrays for scalar operations should have the same size") + + elif right == "OUT": + if state_inputs[left].shape == current_shape: + axis_for_input[left] = current_axis + else: + raise Exception("arrays for scalar operations should have the same size") + + else: + if state_inputs[right].shape == state_inputs[left].shape: + current_axis = list(range(state_inputs[right].ndim)) + current_shape = state_inputs[left].shape + axis_for_input[left] = current_axis + axis_for_input[right] = current_axis + else: + raise Exception("arrays for scalar operations should have the same size") + + stack.append("OUT") + + elif el == "*": + right = stack.pop() + left = stack.pop() + if left == "OUT": + axis_for_input[right] = [i + 1 + current_axis[-1] + for i in range(state_inputs[right].ndim)] + current_axis = current_axis + axis_for_input[right] + current_shape = tuple([i for i in current_shape + state_inputs[right].shape]) + elif right == "OUT": + for key in axis_for_input: + axis_for_input[key] = [i + state_inputs[left].ndim + for i in axis_for_input[key]] + + axis_for_input[left] = [i - len(current_shape) + current_axis[-1] + 1 + for i in range(state_inputs[left].ndim)] + current_axis = current_axis + [i + 1 + current_axis[-1] + for i in range(state_inputs[left].ndim)] + current_shape = tuple([i for i in state_inputs[left].shape + current_shape]) + else: + axis_for_input[left] = list(range(state_inputs[left].ndim)) + axis_for_input[right] = [i + state_inputs[left].ndim + for i in range(state_inputs[right].ndim)] + current_axis = axis_for_input[left] + axis_for_input[right] + current_shape = tuple([i for i in + state_inputs[left].shape + state_inputs[right].shape]) + stack.append("OUT") + + else: + stack.append(el) + + if len(stack) == 0: + pass + elif len(stack) > 1: + raise Exception("exception from mapping_axis") + elif stack[0] != "OUT": + current_axis = [i for i in range(state_inputs[stack[0]].ndim)] + axis_for_input[stack[0]] = current_axis + + if current_axis: + ndim = max(current_axis) + 1 + else: + ndim = 0 + return axis_for_input, ndim + + +def converting_axis2input(state_inputs, axis_for_input, ndim): + """ Having axes for all the input fields, the function returns fields for each axis. """ + input_for_axis = [] + shape = [] + for i in range(ndim): + input_for_axis.append([]) + shape.append(0) + + for inp, axis in axis_for_input.items(): + for (i, ax) in enumerate(axis): + input_for_axis[ax].append(inp) + shape[ax] = state_inputs[inp].shape[i] + + return input_for_axis, shape + + +# used in the Node to change names in a mapper + +def change_mapper(mapper, name): + """changing names of mapper: adding names of the node""" + if isinstance(mapper, str): + if "-" in mapper: + return mapper + else: + return "{}-{}".format(name, mapper) + elif isinstance(mapper, list): + return _add_name(mapper, name) + elif isinstance(mapper, tuple): + mapper_l = list(mapper) + return tuple(_add_name(mapper_l, name)) + + +def _add_name(mlist, name): + for i, elem in enumerate(mlist): + if isinstance(elem, str): + if "-" in elem: + pass + else: + mlist[i] = "{}-{}".format(name, mlist[i]) + elif isinstance(elem, list): + mlist[i] = _add_name(elem, name) + elif isinstance(elem, tuple): + mlist[i] = list(elem) + mlist[i] = _add_name(mlist[i], name) + mlist[i] = tuple(mlist[i]) + return mlist + + +#Function interface + +class Function_Interface(object): + """ A new function interface """ + def __init__(self, function, output_nm, input_map=None): + self.function = function + if type(output_nm) is list: + self._output_nm = output_nm + else: + raise Exception("output_nm should be a list") + if not input_map: + self.input_map = {} + # TODO use signature + for key in inspect.getargspec(function)[0]: + if key not in self.input_map.keys(): + self.input_map[key] = key + + + def run(self, input): + self.output = {} + if self.input_map: + for (key_fun, key_inp) in self.input_map.items(): + try: + input[key_fun] = input.pop(key_inp) + except KeyError: + raise Exception("no {} in the input dictionary".format(key_inp)) + fun_output = self.function(**input) + logger.debug("Function Interf, input={}, fun_out={}".format(input, fun_output)) + if type(fun_output) is tuple: + if len(self._output_nm) == len(fun_output): + for i, out in enumerate(fun_output): + self.output[self._output_nm[i]] = out + else: + raise Exception("length of output_nm doesnt match length of the function output") + elif len(self._output_nm)==1: + self.output[self._output_nm[0]] = fun_output + else: + raise Exception("output_nm doesnt match length of the function output") + + return fun_output diff --git a/nipype/pipeline/engine/state.py b/nipype/pipeline/engine/state.py new file mode 100644 index 0000000000..68d82137f0 --- /dev/null +++ b/nipype/pipeline/engine/state.py @@ -0,0 +1,85 @@ +from collections import OrderedDict + +from . import auxiliary as aux + +class State(object): + def __init__(self, state_inputs, node_name, mapper=None): + self.state_inputs = state_inputs + + self._mapper = mapper + self.node_name = node_name + if self._mapper: + # changing mapper (as in rpn), so I can read from left to right + # e.g. if mapper=('d', ['e', 'r']), _mapper_rpn=['d', 'e', 'r', '*', '.'] + self._mapper_rpn = aux.mapper2rpn(self._mapper) + self._input_names_mapper = [i for i in self._mapper_rpn if i not in ["*", "."]] + else: + self._mapper_rpn = [] + self._input_names_mapper = [] + # not all input field have to be use in the mapper, can be an extra scalar + self._input_names = list(self.state_inputs.keys()) + + # dictionary[key=input names] = list of axes related to + # e.g. {'r': [1], 'e': [0], 'd': [0, 1]} + # ndim - int, number of dimension for the "final array" (that is not created) + self._axis_for_input, self._ndim = aux.mapping_axis(self.state_inputs, self._mapper_rpn) + + # list of inputs variable for each axis + # e.g. [['e', 'd'], ['r', 'd']] + # shape - list, e.g. [2,3] + self._input_for_axis, self._shape = aux.converting_axis2input(self.state_inputs, + self._axis_for_input, self._ndim) + + # list of all possible indexes in each dim, will be use to iterate + # e.g. [[0, 1], [0, 1, 2]] + self._all_elements = [range(i) for i in self._shape] + + + def __getitem__(self, key): + if type(key) is int: + key = (key,) + return self.state_values(key) + + @property + def all_elements(self): + return self._all_elements + + # not used? + #@property + #def mapper(self): + # return self._mapper + + + @property + def ndim(self): + return self._ndim + + + @property + def shape(self): + return self._shape + + + def state_values(self, ind): + if len(ind) > self._ndim: + raise IndexError("too many indices") + + for ii, index in enumerate(ind): + if index > self._shape[ii] - 1: + raise IndexError("index {} is out of bounds for axis {} with size {}".format(index, ii, self._shape[ii])) + + state_dict = {} + for input, ax in self._axis_for_input.items(): + # checking which axes are important for the input + sl_ax = slice(ax[0], ax[-1]+1) + # taking the indexes for the axes + ind_inp = ind[sl_ax] + state_dict[input] = self.state_inputs[input][ind_inp] + + # adding values from input that are not used in the mapper + for input in set(self._input_names) - set(self._input_names_mapper): + state_dict[input] = self.state_inputs[input] + + # in py3.7 we can skip OrderedDict + # returning a named tuple? + return OrderedDict(sorted(state_dict.items(), key=lambda t: t[0])) \ No newline at end of file diff --git a/nipype/pipeline/engine/tests/test_auxiliary.py b/nipype/pipeline/engine/tests/test_auxiliary.py new file mode 100644 index 0000000000..d5d3ffb536 --- /dev/null +++ b/nipype/pipeline/engine/tests/test_auxiliary.py @@ -0,0 +1,73 @@ +from .. import auxiliary as aux + +import numpy as np +import pytest + +@pytest.mark.parametrize("mapper, rpn", + [ + ("a", ["a"]), + (("a", "b"), ["a", "b", "."]), + (["a", "b"], ["a", "b", "*"]), + (["a", ("b", "c")], ["a", "b", "c", ".", "*"]), + ([("a", "b"), "c"], ["a", "b", ".", "c", "*"]), + (["a", ("b", ["c", "d"])], ["a", "b", "c", "d", "*", ".", "*"]) + ]) +def test_mapper2rpn(mapper, rpn): + assert aux.mapper2rpn(mapper) == rpn + + +@pytest.mark.parametrize("mapper, mapper_changed", + [ + ("a", "Node-a"), + (["a", ("b", "c")], ["Node-a", ("Node-b", "Node-c")]), + (("a", ["b", "c"]), ("Node-a", ["Node-b", "Node-c"])) + ]) +def test_change_mapper(mapper, mapper_changed): + assert aux.change_mapper(mapper, "Node") == mapper_changed + + +@pytest.mark.parametrize("inputs, rpn, expected", + [ + ({"a": np.array([1, 2])}, ["a"], {"a": [0]}), + ({"a": np.array([1, 2]), "b": np.array([3, 4])}, ["a", "b", "."], {"a": [0], "b": [0]}), + ({"a": np.array([1, 2]), "b": np.array([3, 4, 1])}, ["a", "b", "*"], {"a": [0], "b": [1]}), + ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, ["a", "b", ".", "c", "*"], + {"a": [0], "b": [0], "c": [1]}), + ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, + ["c", "a", "b", ".", "*"], {"a": [1], "b": [1], "c": [0]}), + ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), "c": np.array([1, 2, 3])}, + ["a", "b", ".", "c", "*"], {"a": [0, 1], "b": [0, 1], "c": [2]}), + ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), + "c": np.array([1, 2, 3])}, ["c", "a", "b", ".", "*"], {"a": [1, 2], "b": [1, 2], "c": [0]}) + ]) +def test_mapping_axis(inputs, rpn, expected): + res = aux.mapping_axis(inputs, rpn)[0] + print(res) + for key in inputs.keys(): + assert res[key] == expected[key] + + +def test_mapping_axis_error(): + with pytest.raises(Exception): + aux.mapping_axis({"a": np.array([1, 2]), "b": np.array([3, 4, 5])}, ["a", "b", "."]) + + +@pytest.mark.parametrize("inputs, axis_inputs, ndim, expected", + [ + ({"a": np.array([1, 2])}, {"a": [0]}, 1, [["a"]]), + ({"a": np.array([1, 2]), "b": np.array([3, 4])}, {"a": [0], "b": [0]}, 1, + [["a", "b"]]), + ({"a": np.array([1, 2]), "b": np.array([3, 4, 1])}, {"a": [0], "b": [1]}, 2, + [["a"], ["b"]]), + ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, + {"a": [0], "b": [0], "c": [1]}, 2, [["a", "b"]]), + ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, + {"a": [1], "b": [1], "c": [0]}, 2, [["c"], ["a", "b"]]), + ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), "c": np.array([1, 2, 3])}, + {"a": [0, 1], "b": [0, 1], "c": [2]}, 3, [["a", "b"], ["a", "b"], ["c"]]), + ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), + "c": np.array([1, 2, 3])}, {"a": [1, 2], "b": [1, 2], "c": [0]}, 3, + [["c"], ["a", "b"], ["a", "b"]]) + ]) +def test_converting_axis2input(inputs, axis_inputs, ndim, expected): + aux.converting_axis2input(inputs, axis_inputs, ndim)[0] == expected \ No newline at end of file From 682248a7f31e51a85541b8355e4c6f58a10a53cb Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Fri, 13 Jul 2018 12:00:16 -0400 Subject: [PATCH 04/16] updating exceptions and __init__; updating logger to the current version in nipype --- nipype/exceptions.py | 4 ++++ nipype/pipeline/engine/__init__.py | 2 +- nipype/pipeline/engine/auxiliary.py | 4 ++-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/nipype/exceptions.py b/nipype/exceptions.py index 19880a22ab..bb914cc4e0 100644 --- a/nipype/exceptions.py +++ b/nipype/exceptions.py @@ -2,6 +2,10 @@ class NipypeError(Exception): pass +class EngineError(Exception): + pass + + class PipelineError(NipypeError): pass diff --git a/nipype/pipeline/engine/__init__.py b/nipype/pipeline/engine/__init__.py index e950086307..2a8ca8850f 100644 --- a/nipype/pipeline/engine/__init__.py +++ b/nipype/pipeline/engine/__init__.py @@ -9,6 +9,6 @@ from __future__ import absolute_import __docformat__ = 'restructuredtext' -from .workflows import Workflow +from .workflows import Workflow, NewNode, NewWorkflow from .nodes import Node, MapNode, JoinNode from .utils import generate_expanded_graph diff --git a/nipype/pipeline/engine/auxiliary.py b/nipype/pipeline/engine/auxiliary.py index b477801004..16e15f9906 100644 --- a/nipype/pipeline/engine/auxiliary.py +++ b/nipype/pipeline/engine/auxiliary.py @@ -1,7 +1,7 @@ import pdb import inspect -from .. import config, logging -logger = logging.getLogger('workflow') +from ... import config, logging +logger = logging.getLogger('nipype.workflow') # Function to change user provided mapper to "reverse polish notation" used in State From 7b3bba66a87135177d9b90b6c54be6aeb14baca3 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Fri, 13 Jul 2018 12:05:19 -0400 Subject: [PATCH 05/16] starting updating NewNode using mapper and state from nipype2_tmp; changing State.state_value so it returns values (and not arrays) --- nipype/pipeline/engine/state.py | 6 +-- nipype/pipeline/engine/workflows.py | 66 +++++++++++++++++++++++++++-- 2 files changed, 65 insertions(+), 7 deletions(-) diff --git a/nipype/pipeline/engine/state.py b/nipype/pipeline/engine/state.py index 68d82137f0..ec96dab04c 100644 --- a/nipype/pipeline/engine/state.py +++ b/nipype/pipeline/engine/state.py @@ -1,4 +1,5 @@ from collections import OrderedDict +import pdb from . import auxiliary as aux @@ -73,13 +74,12 @@ def state_values(self, ind): # checking which axes are important for the input sl_ax = slice(ax[0], ax[-1]+1) # taking the indexes for the axes - ind_inp = ind[sl_ax] + ind_inp = tuple(ind[sl_ax]) #used to be list state_dict[input] = self.state_inputs[input][ind_inp] - # adding values from input that are not used in the mapper for input in set(self._input_names) - set(self._input_names_mapper): state_dict[input] = self.state_inputs[input] # in py3.7 we can skip OrderedDict # returning a named tuple? - return OrderedDict(sorted(state_dict.items(), key=lambda t: t[0])) \ No newline at end of file + return OrderedDict(sorted(state_dict.items(), key=lambda t: t[0])) diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index c0e253c0e3..ee2fa93c98 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -35,6 +35,10 @@ from .base import EngineBase from .nodes import MapNode, Node +from . import state +from . import auxiliary as aux + + # Py2 compat: http://python-future.org/compatible_idioms.html#collections-counter-and-ordereddict from future import standard_library @@ -1066,16 +1070,70 @@ class MapState(object): pass class NewNode(EngineBase): - def __init__(self, inputs={}, map_on=None, join_by=None, + def __init__(self, name, interface, inputs=None, mapper=None, join_by=None, *args, **kwargs): - self._mappers = {} + self.name = name + # dj: do I need a state_input and state_mapper?? + # dj: reading the input from files should be added + if inputs: + # adding name of the node to the input name + self._inputs = dict(("{}-{}".format(self.name, key), value) for (key, value) in inputs.items()) + self._inputs = dict((key, np.array(val)) if type(val) is list else (key, val) + for (key, val) in self._inputs.items()) + else: + self._inputs = {} + if mapper: + # adding name of the node to the input name within the mapper + mapper = aux.change_mapper(mapper, self.name) + self._mapper = mapper + # create state (takes care of mapper, connects inputs with axes, so we can ask for specifc element) + self.state = state.State(state_inputs=self._inputs, mapper=self._mapper, node_name=self.name) + + # adding interface: i'm using Function Interface from aux that has input_map that can change the name of arguments + self._interface = interface + self._interface.input_map = dict((key, "{}-{}".format(self.name, value)) + for (key, value) in self._interface.input_map.items()) + self._joiners = {} - def map(self, field, values=None): + + @property + def mapper(self): + return self._mapper + + + @property + def inputs(self): + return self._inputs + + #@inputs.setter + #def inputs(self, inputs): + # self._inputs = dict(("{}-{}".format(self.name, key), value) for (key, value) in inputs.items()) + # self.state_inputs = self._inputs.copy() + + + def map(self, mapper, inputs=None): + if self._mapper: + raise Exception("mapper is already set") + else: + self._mapper = aux.change_mapper(mapper, self.name) + + if inputs: + inputs = dict(("{}-{}".format(self.name, key), value) for (key, value) in inputs.items()) + inputs = dict((key, np.array(val)) if type(val) is list else (key, val) + for (key, val) in inputs.items()) + self._inputs.update(inputs) + + self.state = state.State(state_inputs=self._inputs, mapper=self._mapper, node_name=self.name) + + + + def map_orig(self, field, values=None): if isinstance(field, list): - for field_ + #for field_ #dj if values is not None: if len(values != len(field)): + pass #dj elif isinstance(field, tuple): pass if values is None: From c9446851df93305767c705b4228e7872edc27b77 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Sat, 14 Jul 2018 08:38:20 -0400 Subject: [PATCH 06/16] adding simple tests --- nipype/pipeline/engine/tests/test_newnode.py | 61 ++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 nipype/pipeline/engine/tests/test_newnode.py diff --git a/nipype/pipeline/engine/tests/test_newnode.py b/nipype/pipeline/engine/tests/test_newnode.py new file mode 100644 index 0000000000..98fab21630 --- /dev/null +++ b/nipype/pipeline/engine/tests/test_newnode.py @@ -0,0 +1,61 @@ +from .. import NewNode +from ..auxiliary import Function_Interface + +import numpy as np +import pytest, pdb + +def fun_addtwo(a): + return a + 2 + +interf_addtwo = Function_Interface(fun_addtwo, ["out"]) + +def test_node_1(): + """Node with only mandatory arguments""" + nn = NewNode(name="N_A", interface=interf_addtwo) + assert nn.mapper is None + assert nn.inputs == {} + assert nn.state._mapper is None + + +def test_node_2(): + """Node with interface and inputs""" + nn = NewNode(name="N_A", interface=interf_addtwo, inputs={"a": 3}) + assert nn.mapper is None + assert nn.inputs == {"N_A-a": 3} + assert nn.state._mapper is None + + +def test_node_3(): + """Node with interface and inputs""" + nn = NewNode(name="N_A", interface=interf_addtwo, inputs={"a": [3, 5]}, mapper="a") + assert nn.mapper == "N_A-a" + assert (nn.inputs["N_A-a"] == np.array([3, 5])).all() + + assert nn.state._mapper == "N_A-a" + assert nn.state.state_values([0]) == {"N_A-a": 3} + assert nn.state.state_values([1]) == {"N_A-a": 5} + + +def test_node_4(): + """Node with interface and inputs""" + nn = NewNode(name="N_A", interface=interf_addtwo, inputs={"a": [3, 5]}) + nn.map(mapper="a") + assert nn.mapper == "N_A-a" + assert (nn.inputs["N_A-a"] == np.array([3, 5])).all() + + assert nn.state._mapper == "N_A-a" + assert nn.state.state_values([0]) == {"N_A-a": 3} + assert nn.state.state_values([1]) == {"N_A-a": 5} + + +def test_node_5(): + """Node with interface and inputs""" + nn = NewNode(name="N_A", interface=interf_addtwo) + nn.map(mapper="a", inputs={"a": [3, 5]}) + assert nn.mapper == "N_A-a" + assert (nn.inputs["N_A-a"] == np.array([3, 5])).all() + + assert nn.state._mapper == "N_A-a" + assert nn.state.state_values([0]) == {"N_A-a": 3} + assert nn.state.state_values([1]) == {"N_A-a": 5} + pdb.set_trace() From 4da873728545fcca2a5f1d038feb5074dc0c1b1c Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Sat, 14 Jul 2018 23:00:15 -0400 Subject: [PATCH 07/16] adding run_interface method for NewNode (still have to add run method) --- nipype/pipeline/engine/tests/test_newnode.py | 15 +++- nipype/pipeline/engine/workflows.py | 75 ++++++++++++++++---- 2 files changed, 74 insertions(+), 16 deletions(-) diff --git a/nipype/pipeline/engine/tests/test_newnode.py b/nipype/pipeline/engine/tests/test_newnode.py index 98fab21630..4a908fe03c 100644 --- a/nipype/pipeline/engine/tests/test_newnode.py +++ b/nipype/pipeline/engine/tests/test_newnode.py @@ -58,4 +58,17 @@ def test_node_5(): assert nn.state._mapper == "N_A-a" assert nn.state.state_values([0]) == {"N_A-a": 3} assert nn.state.state_values([1]) == {"N_A-a": 5} - pdb.set_trace() + + +def test_node_6(): + """Node with interface and inputs, running interface""" + nn = NewNode(name="N_A", interface=interf_addtwo, base_dir="test6") + nn.map(mapper="a", inputs={"a": [3, 5]}) + assert nn.mapper == "N_A-a" + assert (nn.inputs["N_A-a"] == np.array([3, 5])).all() + + assert nn.state._mapper == "N_A-a" + + nn.run_interface_el(0, (0,)) + + diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index ee2fa93c98..ab68627dcf 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -1071,8 +1071,10 @@ class MapState(object): class NewNode(EngineBase): def __init__(self, name, interface, inputs=None, mapper=None, join_by=None, - *args, **kwargs): - self.name = name + base_dir=None, *args, **kwargs): + super(NewNode, self).__init__(name=name, base_dir=base_dir) + # dj: should be changed for wf + self.nodedir = self.base_dir # dj: do I need a state_input and state_mapper?? # dj: reading the input from files should be added if inputs: @@ -1094,6 +1096,7 @@ def __init__(self, name, interface, inputs=None, mapper=None, join_by=None, self._interface.input_map = dict((key, "{}-{}".format(self.name, value)) for (key, value) in self._interface.input_map.items()) + self.needed_outputs = [] self._joiners = {} @@ -1128,24 +1131,66 @@ def map(self, mapper, inputs=None): - def map_orig(self, field, values=None): - if isinstance(field, list): - #for field_ #dj - if values is not None: - if len(values != len(field)): - pass #dj - elif isinstance(field, tuple): - pass - if values is None: - values = getattr(self._inputs, field) - if values is None: - raise MappingError('Cannot map unassigned input field') - self._mappers[field] = values +# def map_orig(self, field, values=None): +# if isinstance(field, list): +# for field_ +# if values is not None: +# if len(values != len(field)): +# elif isinstance(field, tuple): +# pass +# if values is None: +# values = getattr(self._inputs, field) +# if values is None: +# raise MappingError('Cannot map unassigned input field') +# self._mappers[field] = values + # TBD def join(self, field): pass + def run_interface_el(self, i, ind, single_node=False): + """ running interface one element generated from node_state.""" + logger.debug("Run interface el, name={}, i={}, ind={}".format(self.name, i, ind)) + if not single_node: # if we run a single node, we don't have to collect output + state_dict, inputs_dict = self._collecting_input_el(ind) + logger.debug("Run interface el, name={}, inputs_dict={}, state_dict={}".format( + self.name, inputs_dict, state_dict)) + res = self._interface.run(inputs_dict) + #pdb.set_trace() + output = self._interface.output + logger.debug("Run interface el, output={}".format(output)) + dir_nm_el = "_".join(["{}.{}".format(i, j) for i, j in list(state_dict.items())]) + # TODO when join + #if self._joinByKey: + # dir_join = "join_" + "_".join(["{}.{}".format(i, j) for i, j in list(state_dict.items()) if i not in self._joinByKey]) + #elif self._join: + # dir_join = "join_" + #if self._joinByKey or self._join: + # os.makedirs(os.path.join(self.nodedir, dir_join), exist_ok=True) + # dir_nm_el = os.path.join(dir_join, dir_nm_el) + os.makedirs(os.path.join(self.nodedir, dir_nm_el), exist_ok=True) + for key_out in list(output.keys()): + with open(os.path.join(self.nodedir, dir_nm_el, key_out+".txt"), "w") as fout: + fout.write(str(output[key_out])) + return res + + # dj: this is not used for a single node + def _collecting_input_el(self, ind): + state_dict = self.state.state_values(ind) + inputs_dict = {k: state_dict[k] for k in self._inputs.keys()} + # reading extra inputs that come from previous nodes + for (from_node, from_socket, to_socket) in self.needed_outputs: + dir_nm_el_from = "_".join(["{}.{}".format(i, j) for i, j in list(state_dict.items()) + if i in list(from_node.state_inputs.keys())]) + file_from = os.path.join(from_node.nodedir, dir_nm_el_from, from_socket+".txt") + with open(file_from) as f: + inputs_dict["{}-{}".format(self.name, to_socket)] = eval(f.readline()) + return state_dict, inputs_dict + + + + class NewWorkflow(NewNode): def __init__(self, inputs={}, *args, **kwargs): super(NewWorkflow, self).__init__(*args, **kwargs) From 7ff7722c61d6376293de75843989ae9ae5b2cbe4 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Sun, 15 Jul 2018 00:26:35 -0400 Subject: [PATCH 08/16] copy submitter and workers from nipype2_tmp; adding SubmitterNode and SubmitterWorkflow class; simple test for NewNode.run using SubmitterNode --- nipype/pipeline/engine/submitter.py | 150 +++++++++++++++++++ nipype/pipeline/engine/tests/test_newnode.py | 5 +- nipype/pipeline/engine/workers.py | 99 ++++++++++++ nipype/pipeline/engine/workflows.py | 9 +- 4 files changed, 259 insertions(+), 4 deletions(-) create mode 100644 nipype/pipeline/engine/submitter.py create mode 100644 nipype/pipeline/engine/workers.py diff --git a/nipype/pipeline/engine/submitter.py b/nipype/pipeline/engine/submitter.py new file mode 100644 index 0000000000..3035b79f47 --- /dev/null +++ b/nipype/pipeline/engine/submitter.py @@ -0,0 +1,150 @@ +from __future__ import print_function, division, unicode_literals, absolute_import +from builtins import object +from collections import defaultdict + +from future import standard_library +standard_library.install_aliases() + +import os, pdb, time, glob +import itertools, collections +import queue + +from .workers import MpWorker, SerialWorker, DaskWorker, ConcurrentFuturesWorker + +from ... import config, logging +logger = logging.getLogger('nipype.workflow') + +class Submitter(object): + def __init__(self, plugin): + self.plugin = plugin + self.node_line = [] + if self.plugin == "mp": + self.worker = MpWorker() + elif self.plugin == "serial": + self.worker = SerialWorker() + elif self.plugin == "dask": + self.worker = DaskWorker() + elif self.plugin == "cf": + self.worker = ConcurrentFuturesWorker() + else: + raise Exception("plugin {} not available".format(self.plugin)) + + + def submit_work(self, node): + for (i, ind) in enumerate(itertools.product(*node.state.all_elements)): + self._submit_work_el(node, i, ind) + + def _submit_work_el(self, node, i, ind): + logger.debug("SUBMIT WORKER, node: {}, ind: {}".format(node, ind)) + self.worker.run_el(node.run_interface_el, (i, ind)) + + + def close(self): + self.worker.close() + + + +class SubmitterNode(Submitter): + def __init__(self, plugin, node): + super(SubmitterNode, self).__init__(plugin) + self.node = node + + def run_node(self): + self.submit_work(self.node) + + +class SubmitterWorkflow(Submitter): + def __init__(self, graph, plugin): + super(SubmitterWorkflow, self).__init_(plugin) + self.graph = graph + logger.debug('Initialize Submitter, graph: {}'.format(graph)) + self._to_finish = list(self.graph) + + + def run_workflow(self): + for (i_n, node) in enumerate(self.graph): + # submitting all the nodes who are self sufficient (self.graph is already sorted) + if node.sufficient: + self.submit_work(node) + # if its not, its been added to a line + else: + break + + # in case there is no element in the graph that goes to the break + # i want to be sure that not calculating the last node again in the next for loop + if i_n == len(self.graph) - 1: + i_n += 1 + + # adding task for reducer + if node._join_interface: + # decided to add it as one task, since I have to wait for everyone before can start it anyway + self.node_line.append((node, "join", None)) + + + # all nodes that are not self sufficient will go to the line + # iterating over all elements + # (i think ordered list work well here, since it's more efficient to check within a specific order) + for nn in self.graph[i_n:]: + for (i, ind) in enumerate(itertools.product(*nn.state.all_elements)): + self.node_line.append((nn, i, ind)) + if nn._join_interface: + # decided to add it as one task, since I have to wait for everyone before can start it anyway + self.node_line.append((nn, "join", None)) + + + # this parts submits nodes that are waiting to be run + # it should stop when nothing is waiting + while self._nodes_check(): + logger.debug("Submitter, in while, node_line: {}".format(self.node_line)) + time.sleep(3) + + # TODO(?): combining two while together + # this part simply waiting for all "last nodes" to finish + while self._output_check(): + logger.debug("Submitter, in while, to_finish: {}".format(self._to_finish)) + time.sleep(3) + + + # for now without callback, so checking all nodes(with ind) in some order + def _nodes_check(self): + _to_remove = [] + for (to_node, i, ind) in self.node_line: + if i == "join": + if to_node.global_done: #have to check if interface has finished + self.submit_join_work(to_node) + _to_remove.append((to_node, i, ind)) + else: + pass + else: + if to_node.checking_input_el(ind): + self._submit_work_el(to_node, i, ind) + _to_remove.append((to_node, i, ind)) + else: + pass + # can't remove during iterating + for rn in _to_remove: + self.node_line.remove(rn) + return self.node_line + + + # this I believe can be done for entire node + def _output_check(self): + _to_remove = [] + for node in self._to_finish: + print("_output check node", node,node.global_done, node._join_interface, node._global_done_join ) + if node.global_done: + if node._join_interface: + if node.global_done_join: + _to_remove.append(node) + else: + _to_remove.append(node) + for rn in _to_remove: + self._to_finish.remove(rn) + return self._to_finish + + + def submit_join_work(self, node): + logger.debug("SUBMIT JOIN WORKER, node: {}".format(node)) + for (state_redu, res_redu) in node.result[node._join_interface_input]: # TODO, should be more general than out + res_redu_l = [i[1] for i in res_redu] + self.worker.run_el(node.run_interface_join_el, (state_redu, res_redu_l)) diff --git a/nipype/pipeline/engine/tests/test_newnode.py b/nipype/pipeline/engine/tests/test_newnode.py index 4a908fe03c..a2b59ff45e 100644 --- a/nipype/pipeline/engine/tests/test_newnode.py +++ b/nipype/pipeline/engine/tests/test_newnode.py @@ -69,6 +69,5 @@ def test_node_6(): assert nn.state._mapper == "N_A-a" - nn.run_interface_el(0, (0,)) - - + # testing if the run method works + nn.run() diff --git a/nipype/pipeline/engine/workers.py b/nipype/pipeline/engine/workers.py new file mode 100644 index 0000000000..2e7b83608e --- /dev/null +++ b/nipype/pipeline/engine/workers.py @@ -0,0 +1,99 @@ +from __future__ import print_function, division, unicode_literals, absolute_import +from builtins import object +from collections import defaultdict + +from future import standard_library +standard_library.install_aliases() + +from copy import deepcopy +import re, os, pdb, time +import multiprocessing as mp +#import multiprocess as mp +import itertools + +#from pycon_utils import make_cluster +from dask.distributed import Client + +import concurrent.futures as cf + +from ... import config, logging +logger = logging.getLogger('nipype.workflow') + + +class Worker(object): + def __init__(self): + logger.debug("Initialize Worker") + pass + + def run_el(self): + raise NotImplementedError + + def close(self): + raise NotImplementedError + + +class MpWorker(Worker): + def __init__(self, nr_proc=4): #should be none + self.nr_proc = nr_proc + self.pool = mp.Pool(processes=self.nr_proc) + logger.debug('Initialize MpWorker') + + def run_el(self, interface, inp): + self.pool.apply_async(interface, (inp[0], inp[1])) + + def close(self): + # added this method since I was having somtetimes problem with reading results from (existing) files + # i thought that pool.close() should work, but still was getting some errors, so testing terminate + self.pool.terminate() + + +class SerialWorker(Worker): + def __init__(self): + logger.debug("Initialize SerialWorker") + pass + + def run_el(self, interface, inp): + interface(inp[0], inp[1]) + + def close(self): + pass + + +class ConcurrentFuturesWorker(Worker): + def __init__(self, nr_proc=4): + self.nr_proc = nr_proc + self.pool = cf.ProcessPoolExecutor(self.nr_proc) + logger.debug('Initialize ConcurrentFuture') + + def run_el(self, interface, inp): + x = self.pool.submit(interface, inp[0], inp[1]) + #print("X, DONE", x.done()) + x.add_done_callback(lambda x: print("DONE ", interface, inp, x.done)) + #print("DIR", x.result()) + + def close(self): + self.pool.shutdown() + + +class DaskWorker(Worker): + def __init__(self): + from distributed.deploy.local import LocalCluster + logger.debug("Initialize Dask Worker") + #self.cluster = LocalCluster() + self.client = Client()#self.cluster) + print("BOKEH", self.client.scheduler_info()["address"] + ":" + str(self.client.scheduler_info()["services"]["bokeh"])) + + + def run_el(self, interface, inp): + print("DASK, run_el: ", interface, inp) + x = self.client.submit(interface, inp[0], inp[1]) + print("DASK, status: ", x.status) + # this important, otherwise dask will not finish the job + x.add_done_callback(lambda x: print("DONE ", interface, inp)) + #print("res", x.result()) + + + def close(self): + #self.cluster.close() + self.client.close() + diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index ab68627dcf..913375108c 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -37,7 +37,7 @@ from .nodes import MapNode, Node from . import state from . import auxiliary as aux - +from . import submitter as sub # Py2 compat: http://python-future.org/compatible_idioms.html#collections-counter-and-ordereddict @@ -1149,6 +1149,13 @@ def join(self, field): pass + def run(self, plugin="serial"): + self.sub = sub.SubmitterNode(plugin, node=self) #dj: ? + self.sub.run_node() + self.sub.close() + + + def run_interface_el(self, i, ind, single_node=False): """ running interface one element generated from node_state.""" logger.debug("Run interface el, name={}, i={}, ind={}".format(self.name, i, ind)) From 502f6170363a3bb3175b6d64862ae81802ee4f4f Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Mon, 16 Jul 2018 09:27:21 -0400 Subject: [PATCH 09/16] fixing run method for node (previously it didn work for dask and cf): had to create a new class and have now NewNodeBase and NewNode that is a wrapper with the run method --- nipype/pipeline/engine/submitter.py | 4 + nipype/pipeline/engine/tests/test_newnode.py | 21 +++-- nipype/pipeline/engine/workflows.py | 92 ++++++++++++++++---- 3 files changed, 95 insertions(+), 22 deletions(-) diff --git a/nipype/pipeline/engine/submitter.py b/nipype/pipeline/engine/submitter.py index 3035b79f47..d04c29b550 100644 --- a/nipype/pipeline/engine/submitter.py +++ b/nipype/pipeline/engine/submitter.py @@ -52,6 +52,10 @@ def __init__(self, plugin, node): def run_node(self): self.submit_work(self.node) + while not self.node.global_done: + logger.debug("Submitter, in while, to_finish: {}".format(self.node)) + time.sleep(3) + class SubmitterWorkflow(Submitter): def __init__(self, graph, plugin): diff --git a/nipype/pipeline/engine/tests/test_newnode.py b/nipype/pipeline/engine/tests/test_newnode.py index a2b59ff45e..04b20d9983 100644 --- a/nipype/pipeline/engine/tests/test_newnode.py +++ b/nipype/pipeline/engine/tests/test_newnode.py @@ -7,10 +7,10 @@ def fun_addtwo(a): return a + 2 -interf_addtwo = Function_Interface(fun_addtwo, ["out"]) def test_node_1(): """Node with only mandatory arguments""" + interf_addtwo = Function_Interface(fun_addtwo, ["out"]) nn = NewNode(name="N_A", interface=interf_addtwo) assert nn.mapper is None assert nn.inputs == {} @@ -19,6 +19,7 @@ def test_node_1(): def test_node_2(): """Node with interface and inputs""" + interf_addtwo = Function_Interface(fun_addtwo, ["out"]) nn = NewNode(name="N_A", interface=interf_addtwo, inputs={"a": 3}) assert nn.mapper is None assert nn.inputs == {"N_A-a": 3} @@ -27,6 +28,7 @@ def test_node_2(): def test_node_3(): """Node with interface and inputs""" + interf_addtwo = Function_Interface(fun_addtwo, ["out"]) nn = NewNode(name="N_A", interface=interf_addtwo, inputs={"a": [3, 5]}, mapper="a") assert nn.mapper == "N_A-a" assert (nn.inputs["N_A-a"] == np.array([3, 5])).all() @@ -38,6 +40,7 @@ def test_node_3(): def test_node_4(): """Node with interface and inputs""" + interf_addtwo = Function_Interface(fun_addtwo, ["out"]) nn = NewNode(name="N_A", interface=interf_addtwo, inputs={"a": [3, 5]}) nn.map(mapper="a") assert nn.mapper == "N_A-a" @@ -50,6 +53,7 @@ def test_node_4(): def test_node_5(): """Node with interface and inputs""" + interf_addtwo = Function_Interface(fun_addtwo, ["out"]) nn = NewNode(name="N_A", interface=interf_addtwo) nn.map(mapper="a", inputs={"a": [3, 5]}) assert nn.mapper == "N_A-a" @@ -60,14 +64,17 @@ def test_node_5(): assert nn.state.state_values([1]) == {"N_A-a": 5} -def test_node_6(): +Plugins = ["mp", "serial", "cf", "dask"] + +@pytest.mark.parametrize("plugin", Plugins) +def test_node_6(plugin): """Node with interface and inputs, running interface""" - nn = NewNode(name="N_A", interface=interf_addtwo, base_dir="test6") + interf_addtwo = Function_Interface(fun_addtwo, ["out"]) + nn = NewNode(name="N_A", interface=interf_addtwo, base_dir="test6_{}".format(plugin)) nn.map(mapper="a", inputs={"a": [3, 5]}) + assert nn.mapper == "N_A-a" assert (nn.inputs["N_A-a"] == np.array([3, 5])).all() - assert nn.state._mapper == "N_A-a" - - # testing if the run method works - nn.run() + # testing if the node runs properly + nn.run(plugin=plugin) diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index 913375108c..a93ceca885 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -20,6 +20,7 @@ import numpy as np import networkx as nx +import itertools from ... import config, logging from ...exceptions import NodeError, WorkflowError, MappingError, JoinError @@ -39,6 +40,7 @@ from . import auxiliary as aux from . import submitter as sub +import pdb # Py2 compat: http://python-future.org/compatible_idioms.html#collections-counter-and-ordereddict from future import standard_library @@ -1069,10 +1071,10 @@ class Join(Node): class MapState(object): pass -class NewNode(EngineBase): +class NewNodeBase(EngineBase): def __init__(self, name, interface, inputs=None, mapper=None, join_by=None, base_dir=None, *args, **kwargs): - super(NewNode, self).__init__(name=name, base_dir=base_dir) + super(NewNodeBase, self).__init__(name=name, base_dir=base_dir) # dj: should be changed for wf self.nodedir = self.base_dir # dj: do I need a state_input and state_mapper?? @@ -1097,6 +1099,9 @@ def __init__(self, name, interface, inputs=None, mapper=None, join_by=None, for (key, value) in self._interface.input_map.items()) self.needed_outputs = [] + self._out_nm = self._interface._output_nm + self._global_done = False + self._joiners = {} @@ -1109,11 +1114,6 @@ def mapper(self): def inputs(self): return self._inputs - #@inputs.setter - #def inputs(self, inputs): - # self._inputs = dict(("{}-{}".format(self.name, key), value) for (key, value) in inputs.items()) - # self.state_inputs = self._inputs.copy() - def map(self, mapper, inputs=None): if self._mapper: @@ -1126,7 +1126,6 @@ def map(self, mapper, inputs=None): inputs = dict((key, np.array(val)) if type(val) is list else (key, val) for (key, val) in inputs.items()) self._inputs.update(inputs) - self.state = state.State(state_inputs=self._inputs, mapper=self._mapper, node_name=self.name) @@ -1149,13 +1148,6 @@ def join(self, field): pass - def run(self, plugin="serial"): - self.sub = sub.SubmitterNode(plugin, node=self) #dj: ? - self.sub.run_node() - self.sub.close() - - - def run_interface_el(self, i, ind, single_node=False): """ running interface one element generated from node_state.""" logger.debug("Run interface el, name={}, i={}, ind={}".format(self.name, i, ind)) @@ -1197,6 +1189,76 @@ def _collecting_input_el(self, ind): + # checking if all outputs are saved + @property + def global_done(self): + # once _global_done os True, this should not change + logger.debug('global_done {}'.format(self._global_done)) + if self._global_done: + return self._global_done + else: + return self._check_all_results() + + # dj: version without join + def _check_all_results(self): + # checking if all files that should be created are present + for ind in itertools.product(*self.state._all_elements): + state_dict = self.state.state_values(ind) + dir_nm_el = "_".join(["{}.{}".format(i, j) for i, j in list(state_dict.items())]) + for key_out in self._out_nm: + if not os.path.isfile(os.path.join(self.nodedir, dir_nm_el, key_out+".txt")): + return False + self._global_done = True + return True + + + + +class NewNode(object): + """wrapper around NewNodeBase, mostly have run method """ + def __init__(self, name, interface, inputs=None, mapper=None, join_by=None, + base_dir=None, *args, **kwargs): + self.node = NewNodeBase(name, interface, inputs, mapper, join_by, + base_dir, *args, **kwargs) + # dj: might want to use a one element graph + #self.graph = nx.DiGraph() + #self.graph.add_nodes_from([self.node]) + + + def map(self, mapper, inputs=None): + self.node.map(mapper, inputs) + + @property + def state(self): + return self.node.state + + + @property + def mapper(self): + return self.node.mapper + + + @property + def inputs(self): + return self.node._inputs + + @property + def global_done(self): + return self.node._global_done + + + @property + def outputs(self): + return self.node.outputs + + + def run(self, plugin="serial"): + self.sub = sub.SubmitterNode(plugin, node=self.node) + self.sub.run_node() + self.sub.close() + + + class NewWorkflow(NewNode): def __init__(self, inputs={}, *args, **kwargs): From 784ec2269881914e32ec9cbfeedc502f3cc7a4f7 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Mon, 16 Jul 2018 10:06:52 -0400 Subject: [PATCH 10/16] adding reading results; changing node names (node name shouldn have _) --- nipype/pipeline/engine/tests/test_newnode.py | 54 +++++++++++--------- nipype/pipeline/engine/workflows.py | 40 ++++++++++++++- 2 files changed, 68 insertions(+), 26 deletions(-) diff --git a/nipype/pipeline/engine/tests/test_newnode.py b/nipype/pipeline/engine/tests/test_newnode.py index 04b20d9983..935a1c414d 100644 --- a/nipype/pipeline/engine/tests/test_newnode.py +++ b/nipype/pipeline/engine/tests/test_newnode.py @@ -11,7 +11,7 @@ def fun_addtwo(a): def test_node_1(): """Node with only mandatory arguments""" interf_addtwo = Function_Interface(fun_addtwo, ["out"]) - nn = NewNode(name="N_A", interface=interf_addtwo) + nn = NewNode(name="NA", interface=interf_addtwo) assert nn.mapper is None assert nn.inputs == {} assert nn.state._mapper is None @@ -20,48 +20,48 @@ def test_node_1(): def test_node_2(): """Node with interface and inputs""" interf_addtwo = Function_Interface(fun_addtwo, ["out"]) - nn = NewNode(name="N_A", interface=interf_addtwo, inputs={"a": 3}) + nn = NewNode(name="NA", interface=interf_addtwo, inputs={"a": 3}) assert nn.mapper is None - assert nn.inputs == {"N_A-a": 3} + assert nn.inputs == {"NA-a": 3} assert nn.state._mapper is None def test_node_3(): """Node with interface and inputs""" interf_addtwo = Function_Interface(fun_addtwo, ["out"]) - nn = NewNode(name="N_A", interface=interf_addtwo, inputs={"a": [3, 5]}, mapper="a") - assert nn.mapper == "N_A-a" - assert (nn.inputs["N_A-a"] == np.array([3, 5])).all() + nn = NewNode(name="NA", interface=interf_addtwo, inputs={"a": [3, 5]}, mapper="a") + assert nn.mapper == "NA-a" + assert (nn.inputs["NA-a"] == np.array([3, 5])).all() - assert nn.state._mapper == "N_A-a" - assert nn.state.state_values([0]) == {"N_A-a": 3} - assert nn.state.state_values([1]) == {"N_A-a": 5} + assert nn.state._mapper == "NA-a" + assert nn.state.state_values([0]) == {"NA-a": 3} + assert nn.state.state_values([1]) == {"NA-a": 5} def test_node_4(): """Node with interface and inputs""" interf_addtwo = Function_Interface(fun_addtwo, ["out"]) - nn = NewNode(name="N_A", interface=interf_addtwo, inputs={"a": [3, 5]}) + nn = NewNode(name="NA", interface=interf_addtwo, inputs={"a": [3, 5]}) nn.map(mapper="a") - assert nn.mapper == "N_A-a" - assert (nn.inputs["N_A-a"] == np.array([3, 5])).all() + assert nn.mapper == "NA-a" + assert (nn.inputs["NA-a"] == np.array([3, 5])).all() - assert nn.state._mapper == "N_A-a" - assert nn.state.state_values([0]) == {"N_A-a": 3} - assert nn.state.state_values([1]) == {"N_A-a": 5} + assert nn.state._mapper == "NA-a" + assert nn.state.state_values([0]) == {"NA-a": 3} + assert nn.state.state_values([1]) == {"NA-a": 5} def test_node_5(): """Node with interface and inputs""" interf_addtwo = Function_Interface(fun_addtwo, ["out"]) - nn = NewNode(name="N_A", interface=interf_addtwo) + nn = NewNode(name="NA", interface=interf_addtwo) nn.map(mapper="a", inputs={"a": [3, 5]}) - assert nn.mapper == "N_A-a" - assert (nn.inputs["N_A-a"] == np.array([3, 5])).all() + assert nn.mapper == "NA-a" + assert (nn.inputs["NA-a"] == np.array([3, 5])).all() - assert nn.state._mapper == "N_A-a" - assert nn.state.state_values([0]) == {"N_A-a": 3} - assert nn.state.state_values([1]) == {"N_A-a": 5} + assert nn.state._mapper == "NA-a" + assert nn.state.state_values([0]) == {"NA-a": 3} + assert nn.state.state_values([1]) == {"NA-a": 5} Plugins = ["mp", "serial", "cf", "dask"] @@ -70,11 +70,17 @@ def test_node_5(): def test_node_6(plugin): """Node with interface and inputs, running interface""" interf_addtwo = Function_Interface(fun_addtwo, ["out"]) - nn = NewNode(name="N_A", interface=interf_addtwo, base_dir="test6_{}".format(plugin)) + nn = NewNode(name="NA", interface=interf_addtwo, base_dir="test6_{}".format(plugin)) nn.map(mapper="a", inputs={"a": [3, 5]}) - assert nn.mapper == "N_A-a" - assert (nn.inputs["N_A-a"] == np.array([3, 5])).all() + assert nn.mapper == "NA-a" + assert (nn.inputs["NA-a"] == np.array([3, 5])).all() # testing if the node runs properly nn.run(plugin=plugin) + + # checking teh results + expected = [({"NA-a": 3}, 5), ({"NA-a": 5}, 7)] + for i, res in enumerate(expected): + assert nn.result["out"][i][0] == res[0] + assert nn.result["out"][i][1] == res[1] diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index a93ceca885..fb85343d72 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -10,7 +10,7 @@ absolute_import) from builtins import str, bytes, open -import os +import os, glob import os.path as op import sys from datetime import datetime @@ -20,7 +20,7 @@ import numpy as np import networkx as nx -import itertools +import itertools, collections from ... import config, logging from ...exceptions import NodeError, WorkflowError, MappingError, JoinError @@ -1101,6 +1101,7 @@ def __init__(self, name, interface, inputs=None, mapper=None, join_by=None, self.needed_outputs = [] self._out_nm = self._interface._output_nm self._global_done = False + self._result = {} self._joiners = {} @@ -1212,6 +1213,36 @@ def _check_all_results(self): return True + # reading results (without join for now) + @property + def result(self): + if not self._result: + self._reading_results() + return self._result + + + def _reading_results(self): + """ + reading results from file, + doesn't check if everything is ready, i.e. if self.global_done""" + for key_out in self._out_nm: + self._result[key_out] = [] + if self.inputs: #self.state_inputs: + files = [name for name in glob.glob("{}/*/{}.txt".format(self.nodedir, key_out))] + for file in files: + st_el = file.split(os.sep)[-2].split("_") + st_dict = collections.OrderedDict([(el.split(".")[0], eval(el.split(".")[1])) + for el in st_el]) + with open(file) as fout: + logger.debug('Reading Results: file={}, st_dict={}'.format(file, st_dict)) + self._result[key_out].append((st_dict, eval(fout.readline()))) + # for nodes without input + else: + files = [name for name in glob.glob("{}/{}.txt".format(self.nodedir, key_out))] + with open(files[0]) as fout: + self._result[key_out].append(({}, eval(fout.readline()))) + + class NewNode(object): @@ -1252,6 +1283,11 @@ def outputs(self): return self.node.outputs + @property + def result(self): + return self.node.result + + def run(self, plugin="serial"): self.sub = sub.SubmitterNode(plugin, node=self.node) self.sub.run_node() From 18a617612467a7680aad56b78ead962611ad374c Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Mon, 16 Jul 2018 13:04:30 -0400 Subject: [PATCH 11/16] adding dask to requirements --- nipype/info.py | 1 + requirements.txt | 1 + rtd_requirements.txt | 1 + 3 files changed, 3 insertions(+) diff --git a/nipype/info.py b/nipype/info.py index a371b70e2e..c8e384f7b0 100644 --- a/nipype/info.py +++ b/nipype/info.py @@ -148,6 +148,7 @@ def get_nipype_gitversion(): 'pydot>=%s' % PYDOT_MIN_VERSION, 'packaging', 'futures; python_version == "2.7"', + 'dask', ] if sys.version_info <= (3, 4): diff --git a/requirements.txt b/requirements.txt index 5ef00ec98b..602be4c50a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,4 @@ mock pydotplus pydot>=1.2.3 packaging +dask diff --git a/rtd_requirements.txt b/rtd_requirements.txt index 68a366bbdf..6a84e7c2aa 100644 --- a/rtd_requirements.txt +++ b/rtd_requirements.txt @@ -17,3 +17,4 @@ psutil matplotlib packaging numpydoc +dask From 601c15bcd4f27b236f34259a4c40cfd51a1c67c1 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Mon, 16 Jul 2018 17:50:10 -0400 Subject: [PATCH 12/16] adding distributed package --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 602be4c50a..6daeee21ab 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,3 +18,4 @@ pydotplus pydot>=1.2.3 packaging dask +distributed From d5ceb2481db73670e951cf5ccb69378990691ee4 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Mon, 16 Jul 2018 18:37:41 -0400 Subject: [PATCH 13/16] skipping tests for py2; sorting results (not sure if the node should be keep the order) --- nipype/pipeline/engine/tests/test_newnode.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/nipype/pipeline/engine/tests/test_newnode.py b/nipype/pipeline/engine/tests/test_newnode.py index 935a1c414d..501e2fd871 100644 --- a/nipype/pipeline/engine/tests/test_newnode.py +++ b/nipype/pipeline/engine/tests/test_newnode.py @@ -1,9 +1,14 @@ from .. import NewNode from ..auxiliary import Function_Interface +import sys import numpy as np import pytest, pdb +python3_only = pytest.mark.skipif(sys.version_info < (3, 0), + reason="requires Python3") + + def fun_addtwo(a): return a + 2 @@ -67,6 +72,7 @@ def test_node_5(): Plugins = ["mp", "serial", "cf", "dask"] @pytest.mark.parametrize("plugin", Plugins) +@python3_only def test_node_6(plugin): """Node with interface and inputs, running interface""" interf_addtwo = Function_Interface(fun_addtwo, ["out"]) @@ -81,6 +87,10 @@ def test_node_6(plugin): # checking teh results expected = [({"NA-a": 3}, 5), ({"NA-a": 5}, 7)] + # to be sure that there is the same order (not sure if node itself should keep the order) + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + nn.result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) for i, res in enumerate(expected): assert nn.result["out"][i][0] == res[0] assert nn.result["out"][i][1] == res[1] From 692bc1b305f3d510f07bb9ba376bed006c4ce53d Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Mon, 16 Jul 2018 21:52:56 -0400 Subject: [PATCH 14/16] removing printing the bokeh address (doesnt work with travis) --- nipype/pipeline/engine/workers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nipype/pipeline/engine/workers.py b/nipype/pipeline/engine/workers.py index 2e7b83608e..d20a12725c 100644 --- a/nipype/pipeline/engine/workers.py +++ b/nipype/pipeline/engine/workers.py @@ -81,7 +81,7 @@ def __init__(self): logger.debug("Initialize Dask Worker") #self.cluster = LocalCluster() self.client = Client()#self.cluster) - print("BOKEH", self.client.scheduler_info()["address"] + ":" + str(self.client.scheduler_info()["services"]["bokeh"])) + #print("BOKEH", self.client.scheduler_info()["address"] + ":" + str(self.client.scheduler_info()["services"]["bokeh"])) def run_el(self, interface, inp): From ccf89264960a4b8a78b21140ef8d7a10ace3aebc Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Tue, 17 Jul 2018 16:32:46 -0400 Subject: [PATCH 15/16] adding tests for node mappers --- nipype/pipeline/engine/tests/test_newnode.py | 58 ++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/nipype/pipeline/engine/tests/test_newnode.py b/nipype/pipeline/engine/tests/test_newnode.py index 501e2fd871..d6c79f4b30 100644 --- a/nipype/pipeline/engine/tests/test_newnode.py +++ b/nipype/pipeline/engine/tests/test_newnode.py @@ -13,6 +13,11 @@ def fun_addtwo(a): return a + 2 +def fun_addvar(a, b): + return a + b + + + def test_node_1(): """Node with only mandatory arguments""" interf_addtwo = Function_Interface(fun_addtwo, ["out"]) @@ -94,3 +99,56 @@ def test_node_6(plugin): for i, res in enumerate(expected): assert nn.result["out"][i][0] == res[0] assert nn.result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python3_only +def test_node_7(plugin): + """Node with interface and inputs, running interface""" + interf_addvar = Function_Interface(fun_addvar, ["out"]) + nn = NewNode(name="NA", interface=interf_addvar, base_dir="test7_{}".format(plugin)) + nn.map(mapper=("a", "b"), inputs={"a": [3, 5], "b": [2, 1]}) + + assert nn.mapper == ("NA-a", "NA-b") + assert (nn.inputs["NA-a"] == np.array([3, 5])).all() + assert (nn.inputs["NA-b"] == np.array([2, 1])).all() + + # testing if the node runs properly + nn.run(plugin=plugin) + + # checking teh results + expected = [({"NA-a": 3, "NA-b": 2}, 5), ({"NA-a": 5, "NA-b": 1}, 6)] + # to be sure that there is the same order (not sure if node itself should keep the order) + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + nn.result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert nn.result["out"][i][0] == res[0] + assert nn.result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python3_only +def test_node_8(plugin): + """Node with interface and inputs, running interface""" + interf_addvar = Function_Interface(fun_addvar, ["out"]) + nn = NewNode(name="NA", interface=interf_addvar, base_dir="test8_{}".format(plugin)) + nn.map(mapper=["a", "b"], inputs={"a": [3, 5], "b": [2, 1]}) + + assert nn.mapper == ["NA-a", "NA-b"] + assert (nn.inputs["NA-a"] == np.array([3, 5])).all() + assert (nn.inputs["NA-b"] == np.array([2, 1])).all() + + # testing if the node runs properly + nn.run(plugin=plugin) + + # checking teh results + expected = [({"NA-a": 3, "NA-b": 1}, 4), ({"NA-a": 3, "NA-b": 2}, 5), + ({"NA-a": 5, "NA-b": 1}, 6), ({"NA-a": 5, "NA-b": 2}, 7)] + # to be sure that there is the same order (not sure if node itself should keep the order) + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + nn.result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert nn.result["out"][i][0] == res[0] + assert nn.result["out"][i][1] == res[1] From 8300a98d3cd286534f3008ff7d066082f64d3848 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Tue, 17 Jul 2018 16:34:41 -0400 Subject: [PATCH 16/16] changing requirments to py>3.4 (fails with py3.4, not sure if we want to support it anyway) --- nipype/pipeline/engine/tests/test_newnode.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/nipype/pipeline/engine/tests/test_newnode.py b/nipype/pipeline/engine/tests/test_newnode.py index d6c79f4b30..7a0c83be33 100644 --- a/nipype/pipeline/engine/tests/test_newnode.py +++ b/nipype/pipeline/engine/tests/test_newnode.py @@ -5,8 +5,8 @@ import numpy as np import pytest, pdb -python3_only = pytest.mark.skipif(sys.version_info < (3, 0), - reason="requires Python3") +python35_only = pytest.mark.skipif(sys.version_info < (3, 5), + reason="requires Python>3.4") def fun_addtwo(a): @@ -77,7 +77,7 @@ def test_node_5(): Plugins = ["mp", "serial", "cf", "dask"] @pytest.mark.parametrize("plugin", Plugins) -@python3_only +@python35_only def test_node_6(plugin): """Node with interface and inputs, running interface""" interf_addtwo = Function_Interface(fun_addtwo, ["out"]) @@ -102,7 +102,7 @@ def test_node_6(plugin): @pytest.mark.parametrize("plugin", Plugins) -@python3_only +@python35_only def test_node_7(plugin): """Node with interface and inputs, running interface""" interf_addvar = Function_Interface(fun_addvar, ["out"]) @@ -128,7 +128,7 @@ def test_node_7(plugin): @pytest.mark.parametrize("plugin", Plugins) -@python3_only +@python35_only def test_node_8(plugin): """Node with interface and inputs, running interface""" interf_addvar = Function_Interface(fun_addvar, ["out"])