diff --git a/nipype/exceptions.py b/nipype/exceptions.py new file mode 100644 index 0000000000..bb914cc4e0 --- /dev/null +++ b/nipype/exceptions.py @@ -0,0 +1,30 @@ +class NipypeError(Exception): + pass + + +class EngineError(Exception): + pass + + +class PipelineError(NipypeError): + pass + + +class NodeError(EngineError): + pass + + +class WorkflowError(NodeError): + pass + + +class MappingError(NodeError): + pass + + +class JoinError(NodeError): + pass + + +class InterfaceError(NipypeError): + pass diff --git a/nipype/info.py b/nipype/info.py index a371b70e2e..c8e384f7b0 100644 --- a/nipype/info.py +++ b/nipype/info.py @@ -148,6 +148,7 @@ def get_nipype_gitversion(): 'pydot>=%s' % PYDOT_MIN_VERSION, 'packaging', 'futures; python_version == "2.7"', + 'dask', ] if sys.version_info <= (3, 4): diff --git a/nipype/pipeline/engine/__init__.py b/nipype/pipeline/engine/__init__.py index e950086307..2a8ca8850f 100644 --- a/nipype/pipeline/engine/__init__.py +++ b/nipype/pipeline/engine/__init__.py @@ -9,6 +9,6 @@ from __future__ import absolute_import __docformat__ = 'restructuredtext' -from .workflows import Workflow +from .workflows import Workflow, NewNode, NewWorkflow from .nodes import Node, MapNode, JoinNode from .utils import generate_expanded_graph diff --git a/nipype/pipeline/engine/auxiliary.py b/nipype/pipeline/engine/auxiliary.py new file mode 100644 index 0000000000..16e15f9906 --- /dev/null +++ b/nipype/pipeline/engine/auxiliary.py @@ -0,0 +1,206 @@ +import pdb +import inspect +from ... import config, logging +logger = logging.getLogger('nipype.workflow') + + +# Function to change user provided mapper to "reverse polish notation" used in State +def mapper2rpn(mapper): + """ Functions that translate mapper to "reverse polish notation.""" + global output_mapper + output_mapper = [] + _ordering(mapper, i=0) + return output_mapper + + +def _ordering(el, i, current_sign=None): + """ Used in the mapper2rpn to get a proper order of fields and signs. """ + global output_mapper + if type(el) is tuple: + _iterate_list(el, ".") + elif type(el) is list: + _iterate_list(el, "*") + elif type(el) is str: + output_mapper.append(el) + else: + raise Exception("mapper has to be a string, a tuple or a list") + + if i > 0: + output_mapper.append(current_sign) + + +def _iterate_list(element, sign): + """ Used in the mapper2rpn to get recursion. """ + for i, el in enumerate(element): + _ordering(el, i, current_sign=sign) + + +# functions used in State to know which element should be used for a specific axis + +def mapping_axis(state_inputs, mapper_rpn): + """Having inputs and mapper (in rpn notation), functions returns the axes of output for every input.""" + axis_for_input = {} + stack = [] + current_axis = None + current_shape = None + + for el in mapper_rpn: + if el == ".": + right = stack.pop() + left = stack.pop() + if left == "OUT": + if state_inputs[right].shape == current_shape: #todo:should we allow for one-element array? + axis_for_input[right] = current_axis + else: + raise Exception("arrays for scalar operations should have the same size") + + elif right == "OUT": + if state_inputs[left].shape == current_shape: + axis_for_input[left] = current_axis + else: + raise Exception("arrays for scalar operations should have the same size") + + else: + if state_inputs[right].shape == state_inputs[left].shape: + current_axis = list(range(state_inputs[right].ndim)) + current_shape = state_inputs[left].shape + axis_for_input[left] = current_axis + axis_for_input[right] = current_axis + else: + raise Exception("arrays for scalar operations should have the same size") + + stack.append("OUT") + + elif el == "*": + right = stack.pop() + left = stack.pop() + if left == "OUT": + axis_for_input[right] = [i + 1 + current_axis[-1] + for i in range(state_inputs[right].ndim)] + current_axis = current_axis + axis_for_input[right] + current_shape = tuple([i for i in current_shape + state_inputs[right].shape]) + elif right == "OUT": + for key in axis_for_input: + axis_for_input[key] = [i + state_inputs[left].ndim + for i in axis_for_input[key]] + + axis_for_input[left] = [i - len(current_shape) + current_axis[-1] + 1 + for i in range(state_inputs[left].ndim)] + current_axis = current_axis + [i + 1 + current_axis[-1] + for i in range(state_inputs[left].ndim)] + current_shape = tuple([i for i in state_inputs[left].shape + current_shape]) + else: + axis_for_input[left] = list(range(state_inputs[left].ndim)) + axis_for_input[right] = [i + state_inputs[left].ndim + for i in range(state_inputs[right].ndim)] + current_axis = axis_for_input[left] + axis_for_input[right] + current_shape = tuple([i for i in + state_inputs[left].shape + state_inputs[right].shape]) + stack.append("OUT") + + else: + stack.append(el) + + if len(stack) == 0: + pass + elif len(stack) > 1: + raise Exception("exception from mapping_axis") + elif stack[0] != "OUT": + current_axis = [i for i in range(state_inputs[stack[0]].ndim)] + axis_for_input[stack[0]] = current_axis + + if current_axis: + ndim = max(current_axis) + 1 + else: + ndim = 0 + return axis_for_input, ndim + + +def converting_axis2input(state_inputs, axis_for_input, ndim): + """ Having axes for all the input fields, the function returns fields for each axis. """ + input_for_axis = [] + shape = [] + for i in range(ndim): + input_for_axis.append([]) + shape.append(0) + + for inp, axis in axis_for_input.items(): + for (i, ax) in enumerate(axis): + input_for_axis[ax].append(inp) + shape[ax] = state_inputs[inp].shape[i] + + return input_for_axis, shape + + +# used in the Node to change names in a mapper + +def change_mapper(mapper, name): + """changing names of mapper: adding names of the node""" + if isinstance(mapper, str): + if "-" in mapper: + return mapper + else: + return "{}-{}".format(name, mapper) + elif isinstance(mapper, list): + return _add_name(mapper, name) + elif isinstance(mapper, tuple): + mapper_l = list(mapper) + return tuple(_add_name(mapper_l, name)) + + +def _add_name(mlist, name): + for i, elem in enumerate(mlist): + if isinstance(elem, str): + if "-" in elem: + pass + else: + mlist[i] = "{}-{}".format(name, mlist[i]) + elif isinstance(elem, list): + mlist[i] = _add_name(elem, name) + elif isinstance(elem, tuple): + mlist[i] = list(elem) + mlist[i] = _add_name(mlist[i], name) + mlist[i] = tuple(mlist[i]) + return mlist + + +#Function interface + +class Function_Interface(object): + """ A new function interface """ + def __init__(self, function, output_nm, input_map=None): + self.function = function + if type(output_nm) is list: + self._output_nm = output_nm + else: + raise Exception("output_nm should be a list") + if not input_map: + self.input_map = {} + # TODO use signature + for key in inspect.getargspec(function)[0]: + if key not in self.input_map.keys(): + self.input_map[key] = key + + + def run(self, input): + self.output = {} + if self.input_map: + for (key_fun, key_inp) in self.input_map.items(): + try: + input[key_fun] = input.pop(key_inp) + except KeyError: + raise Exception("no {} in the input dictionary".format(key_inp)) + fun_output = self.function(**input) + logger.debug("Function Interf, input={}, fun_out={}".format(input, fun_output)) + if type(fun_output) is tuple: + if len(self._output_nm) == len(fun_output): + for i, out in enumerate(fun_output): + self.output[self._output_nm[i]] = out + else: + raise Exception("length of output_nm doesnt match length of the function output") + elif len(self._output_nm)==1: + self.output[self._output_nm[0]] = fun_output + else: + raise Exception("output_nm doesnt match length of the function output") + + return fun_output diff --git a/nipype/pipeline/engine/state.py b/nipype/pipeline/engine/state.py new file mode 100644 index 0000000000..ec96dab04c --- /dev/null +++ b/nipype/pipeline/engine/state.py @@ -0,0 +1,85 @@ +from collections import OrderedDict +import pdb + +from . import auxiliary as aux + +class State(object): + def __init__(self, state_inputs, node_name, mapper=None): + self.state_inputs = state_inputs + + self._mapper = mapper + self.node_name = node_name + if self._mapper: + # changing mapper (as in rpn), so I can read from left to right + # e.g. if mapper=('d', ['e', 'r']), _mapper_rpn=['d', 'e', 'r', '*', '.'] + self._mapper_rpn = aux.mapper2rpn(self._mapper) + self._input_names_mapper = [i for i in self._mapper_rpn if i not in ["*", "."]] + else: + self._mapper_rpn = [] + self._input_names_mapper = [] + # not all input field have to be use in the mapper, can be an extra scalar + self._input_names = list(self.state_inputs.keys()) + + # dictionary[key=input names] = list of axes related to + # e.g. {'r': [1], 'e': [0], 'd': [0, 1]} + # ndim - int, number of dimension for the "final array" (that is not created) + self._axis_for_input, self._ndim = aux.mapping_axis(self.state_inputs, self._mapper_rpn) + + # list of inputs variable for each axis + # e.g. [['e', 'd'], ['r', 'd']] + # shape - list, e.g. [2,3] + self._input_for_axis, self._shape = aux.converting_axis2input(self.state_inputs, + self._axis_for_input, self._ndim) + + # list of all possible indexes in each dim, will be use to iterate + # e.g. [[0, 1], [0, 1, 2]] + self._all_elements = [range(i) for i in self._shape] + + + def __getitem__(self, key): + if type(key) is int: + key = (key,) + return self.state_values(key) + + @property + def all_elements(self): + return self._all_elements + + # not used? + #@property + #def mapper(self): + # return self._mapper + + + @property + def ndim(self): + return self._ndim + + + @property + def shape(self): + return self._shape + + + def state_values(self, ind): + if len(ind) > self._ndim: + raise IndexError("too many indices") + + for ii, index in enumerate(ind): + if index > self._shape[ii] - 1: + raise IndexError("index {} is out of bounds for axis {} with size {}".format(index, ii, self._shape[ii])) + + state_dict = {} + for input, ax in self._axis_for_input.items(): + # checking which axes are important for the input + sl_ax = slice(ax[0], ax[-1]+1) + # taking the indexes for the axes + ind_inp = tuple(ind[sl_ax]) #used to be list + state_dict[input] = self.state_inputs[input][ind_inp] + # adding values from input that are not used in the mapper + for input in set(self._input_names) - set(self._input_names_mapper): + state_dict[input] = self.state_inputs[input] + + # in py3.7 we can skip OrderedDict + # returning a named tuple? + return OrderedDict(sorted(state_dict.items(), key=lambda t: t[0])) diff --git a/nipype/pipeline/engine/submitter.py b/nipype/pipeline/engine/submitter.py new file mode 100644 index 0000000000..d04c29b550 --- /dev/null +++ b/nipype/pipeline/engine/submitter.py @@ -0,0 +1,154 @@ +from __future__ import print_function, division, unicode_literals, absolute_import +from builtins import object +from collections import defaultdict + +from future import standard_library +standard_library.install_aliases() + +import os, pdb, time, glob +import itertools, collections +import queue + +from .workers import MpWorker, SerialWorker, DaskWorker, ConcurrentFuturesWorker + +from ... import config, logging +logger = logging.getLogger('nipype.workflow') + +class Submitter(object): + def __init__(self, plugin): + self.plugin = plugin + self.node_line = [] + if self.plugin == "mp": + self.worker = MpWorker() + elif self.plugin == "serial": + self.worker = SerialWorker() + elif self.plugin == "dask": + self.worker = DaskWorker() + elif self.plugin == "cf": + self.worker = ConcurrentFuturesWorker() + else: + raise Exception("plugin {} not available".format(self.plugin)) + + + def submit_work(self, node): + for (i, ind) in enumerate(itertools.product(*node.state.all_elements)): + self._submit_work_el(node, i, ind) + + def _submit_work_el(self, node, i, ind): + logger.debug("SUBMIT WORKER, node: {}, ind: {}".format(node, ind)) + self.worker.run_el(node.run_interface_el, (i, ind)) + + + def close(self): + self.worker.close() + + + +class SubmitterNode(Submitter): + def __init__(self, plugin, node): + super(SubmitterNode, self).__init__(plugin) + self.node = node + + def run_node(self): + self.submit_work(self.node) + + while not self.node.global_done: + logger.debug("Submitter, in while, to_finish: {}".format(self.node)) + time.sleep(3) + + +class SubmitterWorkflow(Submitter): + def __init__(self, graph, plugin): + super(SubmitterWorkflow, self).__init_(plugin) + self.graph = graph + logger.debug('Initialize Submitter, graph: {}'.format(graph)) + self._to_finish = list(self.graph) + + + def run_workflow(self): + for (i_n, node) in enumerate(self.graph): + # submitting all the nodes who are self sufficient (self.graph is already sorted) + if node.sufficient: + self.submit_work(node) + # if its not, its been added to a line + else: + break + + # in case there is no element in the graph that goes to the break + # i want to be sure that not calculating the last node again in the next for loop + if i_n == len(self.graph) - 1: + i_n += 1 + + # adding task for reducer + if node._join_interface: + # decided to add it as one task, since I have to wait for everyone before can start it anyway + self.node_line.append((node, "join", None)) + + + # all nodes that are not self sufficient will go to the line + # iterating over all elements + # (i think ordered list work well here, since it's more efficient to check within a specific order) + for nn in self.graph[i_n:]: + for (i, ind) in enumerate(itertools.product(*nn.state.all_elements)): + self.node_line.append((nn, i, ind)) + if nn._join_interface: + # decided to add it as one task, since I have to wait for everyone before can start it anyway + self.node_line.append((nn, "join", None)) + + + # this parts submits nodes that are waiting to be run + # it should stop when nothing is waiting + while self._nodes_check(): + logger.debug("Submitter, in while, node_line: {}".format(self.node_line)) + time.sleep(3) + + # TODO(?): combining two while together + # this part simply waiting for all "last nodes" to finish + while self._output_check(): + logger.debug("Submitter, in while, to_finish: {}".format(self._to_finish)) + time.sleep(3) + + + # for now without callback, so checking all nodes(with ind) in some order + def _nodes_check(self): + _to_remove = [] + for (to_node, i, ind) in self.node_line: + if i == "join": + if to_node.global_done: #have to check if interface has finished + self.submit_join_work(to_node) + _to_remove.append((to_node, i, ind)) + else: + pass + else: + if to_node.checking_input_el(ind): + self._submit_work_el(to_node, i, ind) + _to_remove.append((to_node, i, ind)) + else: + pass + # can't remove during iterating + for rn in _to_remove: + self.node_line.remove(rn) + return self.node_line + + + # this I believe can be done for entire node + def _output_check(self): + _to_remove = [] + for node in self._to_finish: + print("_output check node", node,node.global_done, node._join_interface, node._global_done_join ) + if node.global_done: + if node._join_interface: + if node.global_done_join: + _to_remove.append(node) + else: + _to_remove.append(node) + for rn in _to_remove: + self._to_finish.remove(rn) + return self._to_finish + + + def submit_join_work(self, node): + logger.debug("SUBMIT JOIN WORKER, node: {}".format(node)) + for (state_redu, res_redu) in node.result[node._join_interface_input]: # TODO, should be more general than out + res_redu_l = [i[1] for i in res_redu] + self.worker.run_el(node.run_interface_join_el, (state_redu, res_redu_l)) diff --git a/nipype/pipeline/engine/tests/test_auxiliary.py b/nipype/pipeline/engine/tests/test_auxiliary.py new file mode 100644 index 0000000000..d5d3ffb536 --- /dev/null +++ b/nipype/pipeline/engine/tests/test_auxiliary.py @@ -0,0 +1,73 @@ +from .. import auxiliary as aux + +import numpy as np +import pytest + +@pytest.mark.parametrize("mapper, rpn", + [ + ("a", ["a"]), + (("a", "b"), ["a", "b", "."]), + (["a", "b"], ["a", "b", "*"]), + (["a", ("b", "c")], ["a", "b", "c", ".", "*"]), + ([("a", "b"), "c"], ["a", "b", ".", "c", "*"]), + (["a", ("b", ["c", "d"])], ["a", "b", "c", "d", "*", ".", "*"]) + ]) +def test_mapper2rpn(mapper, rpn): + assert aux.mapper2rpn(mapper) == rpn + + +@pytest.mark.parametrize("mapper, mapper_changed", + [ + ("a", "Node-a"), + (["a", ("b", "c")], ["Node-a", ("Node-b", "Node-c")]), + (("a", ["b", "c"]), ("Node-a", ["Node-b", "Node-c"])) + ]) +def test_change_mapper(mapper, mapper_changed): + assert aux.change_mapper(mapper, "Node") == mapper_changed + + +@pytest.mark.parametrize("inputs, rpn, expected", + [ + ({"a": np.array([1, 2])}, ["a"], {"a": [0]}), + ({"a": np.array([1, 2]), "b": np.array([3, 4])}, ["a", "b", "."], {"a": [0], "b": [0]}), + ({"a": np.array([1, 2]), "b": np.array([3, 4, 1])}, ["a", "b", "*"], {"a": [0], "b": [1]}), + ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, ["a", "b", ".", "c", "*"], + {"a": [0], "b": [0], "c": [1]}), + ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, + ["c", "a", "b", ".", "*"], {"a": [1], "b": [1], "c": [0]}), + ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), "c": np.array([1, 2, 3])}, + ["a", "b", ".", "c", "*"], {"a": [0, 1], "b": [0, 1], "c": [2]}), + ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), + "c": np.array([1, 2, 3])}, ["c", "a", "b", ".", "*"], {"a": [1, 2], "b": [1, 2], "c": [0]}) + ]) +def test_mapping_axis(inputs, rpn, expected): + res = aux.mapping_axis(inputs, rpn)[0] + print(res) + for key in inputs.keys(): + assert res[key] == expected[key] + + +def test_mapping_axis_error(): + with pytest.raises(Exception): + aux.mapping_axis({"a": np.array([1, 2]), "b": np.array([3, 4, 5])}, ["a", "b", "."]) + + +@pytest.mark.parametrize("inputs, axis_inputs, ndim, expected", + [ + ({"a": np.array([1, 2])}, {"a": [0]}, 1, [["a"]]), + ({"a": np.array([1, 2]), "b": np.array([3, 4])}, {"a": [0], "b": [0]}, 1, + [["a", "b"]]), + ({"a": np.array([1, 2]), "b": np.array([3, 4, 1])}, {"a": [0], "b": [1]}, 2, + [["a"], ["b"]]), + ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, + {"a": [0], "b": [0], "c": [1]}, 2, [["a", "b"]]), + ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, + {"a": [1], "b": [1], "c": [0]}, 2, [["c"], ["a", "b"]]), + ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), "c": np.array([1, 2, 3])}, + {"a": [0, 1], "b": [0, 1], "c": [2]}, 3, [["a", "b"], ["a", "b"], ["c"]]), + ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), + "c": np.array([1, 2, 3])}, {"a": [1, 2], "b": [1, 2], "c": [0]}, 3, + [["c"], ["a", "b"], ["a", "b"]]) + ]) +def test_converting_axis2input(inputs, axis_inputs, ndim, expected): + aux.converting_axis2input(inputs, axis_inputs, ndim)[0] == expected \ No newline at end of file diff --git a/nipype/pipeline/engine/tests/test_newnode.py b/nipype/pipeline/engine/tests/test_newnode.py new file mode 100644 index 0000000000..7a0c83be33 --- /dev/null +++ b/nipype/pipeline/engine/tests/test_newnode.py @@ -0,0 +1,154 @@ +from .. import NewNode +from ..auxiliary import Function_Interface + +import sys +import numpy as np +import pytest, pdb + +python35_only = pytest.mark.skipif(sys.version_info < (3, 5), + reason="requires Python>3.4") + + +def fun_addtwo(a): + return a + 2 + + +def fun_addvar(a, b): + return a + b + + + +def test_node_1(): + """Node with only mandatory arguments""" + interf_addtwo = Function_Interface(fun_addtwo, ["out"]) + nn = NewNode(name="NA", interface=interf_addtwo) + assert nn.mapper is None + assert nn.inputs == {} + assert nn.state._mapper is None + + +def test_node_2(): + """Node with interface and inputs""" + interf_addtwo = Function_Interface(fun_addtwo, ["out"]) + nn = NewNode(name="NA", interface=interf_addtwo, inputs={"a": 3}) + assert nn.mapper is None + assert nn.inputs == {"NA-a": 3} + assert nn.state._mapper is None + + +def test_node_3(): + """Node with interface and inputs""" + interf_addtwo = Function_Interface(fun_addtwo, ["out"]) + nn = NewNode(name="NA", interface=interf_addtwo, inputs={"a": [3, 5]}, mapper="a") + assert nn.mapper == "NA-a" + assert (nn.inputs["NA-a"] == np.array([3, 5])).all() + + assert nn.state._mapper == "NA-a" + assert nn.state.state_values([0]) == {"NA-a": 3} + assert nn.state.state_values([1]) == {"NA-a": 5} + + +def test_node_4(): + """Node with interface and inputs""" + interf_addtwo = Function_Interface(fun_addtwo, ["out"]) + nn = NewNode(name="NA", interface=interf_addtwo, inputs={"a": [3, 5]}) + nn.map(mapper="a") + assert nn.mapper == "NA-a" + assert (nn.inputs["NA-a"] == np.array([3, 5])).all() + + assert nn.state._mapper == "NA-a" + assert nn.state.state_values([0]) == {"NA-a": 3} + assert nn.state.state_values([1]) == {"NA-a": 5} + + +def test_node_5(): + """Node with interface and inputs""" + interf_addtwo = Function_Interface(fun_addtwo, ["out"]) + nn = NewNode(name="NA", interface=interf_addtwo) + nn.map(mapper="a", inputs={"a": [3, 5]}) + assert nn.mapper == "NA-a" + assert (nn.inputs["NA-a"] == np.array([3, 5])).all() + + assert nn.state._mapper == "NA-a" + assert nn.state.state_values([0]) == {"NA-a": 3} + assert nn.state.state_values([1]) == {"NA-a": 5} + + +Plugins = ["mp", "serial", "cf", "dask"] + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_node_6(plugin): + """Node with interface and inputs, running interface""" + interf_addtwo = Function_Interface(fun_addtwo, ["out"]) + nn = NewNode(name="NA", interface=interf_addtwo, base_dir="test6_{}".format(plugin)) + nn.map(mapper="a", inputs={"a": [3, 5]}) + + assert nn.mapper == "NA-a" + assert (nn.inputs["NA-a"] == np.array([3, 5])).all() + + # testing if the node runs properly + nn.run(plugin=plugin) + + # checking teh results + expected = [({"NA-a": 3}, 5), ({"NA-a": 5}, 7)] + # to be sure that there is the same order (not sure if node itself should keep the order) + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + nn.result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert nn.result["out"][i][0] == res[0] + assert nn.result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_node_7(plugin): + """Node with interface and inputs, running interface""" + interf_addvar = Function_Interface(fun_addvar, ["out"]) + nn = NewNode(name="NA", interface=interf_addvar, base_dir="test7_{}".format(plugin)) + nn.map(mapper=("a", "b"), inputs={"a": [3, 5], "b": [2, 1]}) + + assert nn.mapper == ("NA-a", "NA-b") + assert (nn.inputs["NA-a"] == np.array([3, 5])).all() + assert (nn.inputs["NA-b"] == np.array([2, 1])).all() + + # testing if the node runs properly + nn.run(plugin=plugin) + + # checking teh results + expected = [({"NA-a": 3, "NA-b": 2}, 5), ({"NA-a": 5, "NA-b": 1}, 6)] + # to be sure that there is the same order (not sure if node itself should keep the order) + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + nn.result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert nn.result["out"][i][0] == res[0] + assert nn.result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_node_8(plugin): + """Node with interface and inputs, running interface""" + interf_addvar = Function_Interface(fun_addvar, ["out"]) + nn = NewNode(name="NA", interface=interf_addvar, base_dir="test8_{}".format(plugin)) + nn.map(mapper=["a", "b"], inputs={"a": [3, 5], "b": [2, 1]}) + + assert nn.mapper == ["NA-a", "NA-b"] + assert (nn.inputs["NA-a"] == np.array([3, 5])).all() + assert (nn.inputs["NA-b"] == np.array([2, 1])).all() + + # testing if the node runs properly + nn.run(plugin=plugin) + + # checking teh results + expected = [({"NA-a": 3, "NA-b": 1}, 4), ({"NA-a": 3, "NA-b": 2}, 5), + ({"NA-a": 5, "NA-b": 1}, 6), ({"NA-a": 5, "NA-b": 2}, 7)] + # to be sure that there is the same order (not sure if node itself should keep the order) + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + nn.result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert nn.result["out"][i][0] == res[0] + assert nn.result["out"][i][1] == res[1] diff --git a/nipype/pipeline/engine/workers.py b/nipype/pipeline/engine/workers.py new file mode 100644 index 0000000000..d20a12725c --- /dev/null +++ b/nipype/pipeline/engine/workers.py @@ -0,0 +1,99 @@ +from __future__ import print_function, division, unicode_literals, absolute_import +from builtins import object +from collections import defaultdict + +from future import standard_library +standard_library.install_aliases() + +from copy import deepcopy +import re, os, pdb, time +import multiprocessing as mp +#import multiprocess as mp +import itertools + +#from pycon_utils import make_cluster +from dask.distributed import Client + +import concurrent.futures as cf + +from ... import config, logging +logger = logging.getLogger('nipype.workflow') + + +class Worker(object): + def __init__(self): + logger.debug("Initialize Worker") + pass + + def run_el(self): + raise NotImplementedError + + def close(self): + raise NotImplementedError + + +class MpWorker(Worker): + def __init__(self, nr_proc=4): #should be none + self.nr_proc = nr_proc + self.pool = mp.Pool(processes=self.nr_proc) + logger.debug('Initialize MpWorker') + + def run_el(self, interface, inp): + self.pool.apply_async(interface, (inp[0], inp[1])) + + def close(self): + # added this method since I was having somtetimes problem with reading results from (existing) files + # i thought that pool.close() should work, but still was getting some errors, so testing terminate + self.pool.terminate() + + +class SerialWorker(Worker): + def __init__(self): + logger.debug("Initialize SerialWorker") + pass + + def run_el(self, interface, inp): + interface(inp[0], inp[1]) + + def close(self): + pass + + +class ConcurrentFuturesWorker(Worker): + def __init__(self, nr_proc=4): + self.nr_proc = nr_proc + self.pool = cf.ProcessPoolExecutor(self.nr_proc) + logger.debug('Initialize ConcurrentFuture') + + def run_el(self, interface, inp): + x = self.pool.submit(interface, inp[0], inp[1]) + #print("X, DONE", x.done()) + x.add_done_callback(lambda x: print("DONE ", interface, inp, x.done)) + #print("DIR", x.result()) + + def close(self): + self.pool.shutdown() + + +class DaskWorker(Worker): + def __init__(self): + from distributed.deploy.local import LocalCluster + logger.debug("Initialize Dask Worker") + #self.cluster = LocalCluster() + self.client = Client()#self.cluster) + #print("BOKEH", self.client.scheduler_info()["address"] + ":" + str(self.client.scheduler_info()["services"]["bokeh"])) + + + def run_el(self, interface, inp): + print("DASK, run_el: ", interface, inp) + x = self.client.submit(interface, inp[0], inp[1]) + print("DASK, status: ", x.status) + # this important, otherwise dask will not finish the job + x.add_done_callback(lambda x: print("DONE ", interface, inp)) + #print("res", x.result()) + + + def close(self): + #self.cluster.close() + self.client.close() + diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index d2f040786e..fb85343d72 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -10,7 +10,7 @@ absolute_import) from builtins import str, bytes, open -import os +import os, glob import os.path as op import sys from datetime import datetime @@ -20,8 +20,10 @@ import numpy as np import networkx as nx +import itertools, collections from ... import config, logging +from ...exceptions import NodeError, WorkflowError, MappingError, JoinError from ...utils.misc import str2bool from ...utils.functions import (getsource, create_function_from_source) @@ -33,7 +35,12 @@ get_print_name, merge_dict, format_node) from .base import EngineBase -from .nodes import MapNode +from .nodes import MapNode, Node +from . import state +from . import auxiliary as aux +from . import submitter as sub + +import pdb # Py2 compat: http://python-future.org/compatible_idioms.html#collections-counter-and-ordereddict from future import standard_library @@ -1043,3 +1050,311 @@ def _get_dot(self, vname1.replace('.', '_'))) logger.debug('cross connection: %s', dotlist[-1]) return ('\n' + prefix).join(dotlist) + + def add(self, name, node_like): + if is_interface(node_like): + node = Node(node_like, name=name) + elif is_node(node_like): + node = node_like + + self.add_nodes([node]) + + +class Map(Node): + pass + + +class Join(Node): + pass + + +class MapState(object): + pass + +class NewNodeBase(EngineBase): + def __init__(self, name, interface, inputs=None, mapper=None, join_by=None, + base_dir=None, *args, **kwargs): + super(NewNodeBase, self).__init__(name=name, base_dir=base_dir) + # dj: should be changed for wf + self.nodedir = self.base_dir + # dj: do I need a state_input and state_mapper?? + # dj: reading the input from files should be added + if inputs: + # adding name of the node to the input name + self._inputs = dict(("{}-{}".format(self.name, key), value) for (key, value) in inputs.items()) + self._inputs = dict((key, np.array(val)) if type(val) is list else (key, val) + for (key, val) in self._inputs.items()) + else: + self._inputs = {} + if mapper: + # adding name of the node to the input name within the mapper + mapper = aux.change_mapper(mapper, self.name) + self._mapper = mapper + # create state (takes care of mapper, connects inputs with axes, so we can ask for specifc element) + self.state = state.State(state_inputs=self._inputs, mapper=self._mapper, node_name=self.name) + + # adding interface: i'm using Function Interface from aux that has input_map that can change the name of arguments + self._interface = interface + self._interface.input_map = dict((key, "{}-{}".format(self.name, value)) + for (key, value) in self._interface.input_map.items()) + + self.needed_outputs = [] + self._out_nm = self._interface._output_nm + self._global_done = False + self._result = {} + + self._joiners = {} + + + @property + def mapper(self): + return self._mapper + + + @property + def inputs(self): + return self._inputs + + + def map(self, mapper, inputs=None): + if self._mapper: + raise Exception("mapper is already set") + else: + self._mapper = aux.change_mapper(mapper, self.name) + + if inputs: + inputs = dict(("{}-{}".format(self.name, key), value) for (key, value) in inputs.items()) + inputs = dict((key, np.array(val)) if type(val) is list else (key, val) + for (key, val) in inputs.items()) + self._inputs.update(inputs) + self.state = state.State(state_inputs=self._inputs, mapper=self._mapper, node_name=self.name) + + + +# def map_orig(self, field, values=None): +# if isinstance(field, list): +# for field_ +# if values is not None: +# if len(values != len(field)): +# elif isinstance(field, tuple): +# pass +# if values is None: +# values = getattr(self._inputs, field) +# if values is None: +# raise MappingError('Cannot map unassigned input field') +# self._mappers[field] = values + + # TBD + def join(self, field): + pass + + + def run_interface_el(self, i, ind, single_node=False): + """ running interface one element generated from node_state.""" + logger.debug("Run interface el, name={}, i={}, ind={}".format(self.name, i, ind)) + if not single_node: # if we run a single node, we don't have to collect output + state_dict, inputs_dict = self._collecting_input_el(ind) + logger.debug("Run interface el, name={}, inputs_dict={}, state_dict={}".format( + self.name, inputs_dict, state_dict)) + res = self._interface.run(inputs_dict) + #pdb.set_trace() + output = self._interface.output + logger.debug("Run interface el, output={}".format(output)) + dir_nm_el = "_".join(["{}.{}".format(i, j) for i, j in list(state_dict.items())]) + # TODO when join + #if self._joinByKey: + # dir_join = "join_" + "_".join(["{}.{}".format(i, j) for i, j in list(state_dict.items()) if i not in self._joinByKey]) + #elif self._join: + # dir_join = "join_" + #if self._joinByKey or self._join: + # os.makedirs(os.path.join(self.nodedir, dir_join), exist_ok=True) + # dir_nm_el = os.path.join(dir_join, dir_nm_el) + os.makedirs(os.path.join(self.nodedir, dir_nm_el), exist_ok=True) + for key_out in list(output.keys()): + with open(os.path.join(self.nodedir, dir_nm_el, key_out+".txt"), "w") as fout: + fout.write(str(output[key_out])) + return res + + # dj: this is not used for a single node + def _collecting_input_el(self, ind): + state_dict = self.state.state_values(ind) + inputs_dict = {k: state_dict[k] for k in self._inputs.keys()} + # reading extra inputs that come from previous nodes + for (from_node, from_socket, to_socket) in self.needed_outputs: + dir_nm_el_from = "_".join(["{}.{}".format(i, j) for i, j in list(state_dict.items()) + if i in list(from_node.state_inputs.keys())]) + file_from = os.path.join(from_node.nodedir, dir_nm_el_from, from_socket+".txt") + with open(file_from) as f: + inputs_dict["{}-{}".format(self.name, to_socket)] = eval(f.readline()) + return state_dict, inputs_dict + + + + # checking if all outputs are saved + @property + def global_done(self): + # once _global_done os True, this should not change + logger.debug('global_done {}'.format(self._global_done)) + if self._global_done: + return self._global_done + else: + return self._check_all_results() + + # dj: version without join + def _check_all_results(self): + # checking if all files that should be created are present + for ind in itertools.product(*self.state._all_elements): + state_dict = self.state.state_values(ind) + dir_nm_el = "_".join(["{}.{}".format(i, j) for i, j in list(state_dict.items())]) + for key_out in self._out_nm: + if not os.path.isfile(os.path.join(self.nodedir, dir_nm_el, key_out+".txt")): + return False + self._global_done = True + return True + + + # reading results (without join for now) + @property + def result(self): + if not self._result: + self._reading_results() + return self._result + + + def _reading_results(self): + """ + reading results from file, + doesn't check if everything is ready, i.e. if self.global_done""" + for key_out in self._out_nm: + self._result[key_out] = [] + if self.inputs: #self.state_inputs: + files = [name for name in glob.glob("{}/*/{}.txt".format(self.nodedir, key_out))] + for file in files: + st_el = file.split(os.sep)[-2].split("_") + st_dict = collections.OrderedDict([(el.split(".")[0], eval(el.split(".")[1])) + for el in st_el]) + with open(file) as fout: + logger.debug('Reading Results: file={}, st_dict={}'.format(file, st_dict)) + self._result[key_out].append((st_dict, eval(fout.readline()))) + # for nodes without input + else: + files = [name for name in glob.glob("{}/{}.txt".format(self.nodedir, key_out))] + with open(files[0]) as fout: + self._result[key_out].append(({}, eval(fout.readline()))) + + + + +class NewNode(object): + """wrapper around NewNodeBase, mostly have run method """ + def __init__(self, name, interface, inputs=None, mapper=None, join_by=None, + base_dir=None, *args, **kwargs): + self.node = NewNodeBase(name, interface, inputs, mapper, join_by, + base_dir, *args, **kwargs) + # dj: might want to use a one element graph + #self.graph = nx.DiGraph() + #self.graph.add_nodes_from([self.node]) + + + def map(self, mapper, inputs=None): + self.node.map(mapper, inputs) + + @property + def state(self): + return self.node.state + + + @property + def mapper(self): + return self.node.mapper + + + @property + def inputs(self): + return self.node._inputs + + @property + def global_done(self): + return self.node._global_done + + + @property + def outputs(self): + return self.node.outputs + + + @property + def result(self): + return self.node.result + + + def run(self, plugin="serial"): + self.sub = sub.SubmitterNode(plugin, node=self.node) + self.sub.run_node() + self.sub.close() + + + + +class NewWorkflow(NewNode): + def __init__(self, inputs={}, *args, **kwargs): + super(NewWorkflow, self).__init__(*args, **kwargs) + + self._nodes = {} + + mro = self.__class__.mro() + wf_klasses = mro[:mro.index(NewWorkflow)][::-1] + items = {} + for klass in wf_klasses: + items.update(klass.__dict__) + for name, runnable in items.items(): + if name in ('__module__', '__doc__'): + continue + + self.add(name, value) + + def add(self, name, runnable): + if is_function(runnable): + node = Node(Function(function=runnable), name=name) + elif is_interface(runnable): + node = Node(runnable, name=name) + elif is_node(runnable): + node = runnable if runnable.name == name else runnable.clone(name=name) + else: + raise ValueError("Unknown workflow element: {!r}".format(runnable)) + setattr(self, name, node) + self._nodes[name] = node + self._last_added = name + + def map(self, field, node=None, values=None): + if node is None: + if '.' in field: + node, field = field.rsplit('.', 1) + else: + node = self._last_added + + if '.' in node: + subwf, node = node.split('.', 1) + self._nodes[subwf].map(field, node, values) + return + + if node in self._mappers: + raise WorkflowError("Cannot assign two mappings to the same input") + + self._mappers[node] = (field, values) + + def join(self, field, node=None): + pass + + +def is_function(obj): + return hasattr(obj, '__call__') + + +def is_interface(obj): + return all(hasattr(obj, protocol) + for protocol in ('input_spec', 'output_spec', 'run')) + + +def is_node(obj): + return hasattr(obj, itername) diff --git a/requirements.txt b/requirements.txt index 5ef00ec98b..6daeee21ab 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,5 @@ mock pydotplus pydot>=1.2.3 packaging +dask +distributed diff --git a/rtd_requirements.txt b/rtd_requirements.txt index 68a366bbdf..6a84e7c2aa 100644 --- a/rtd_requirements.txt +++ b/rtd_requirements.txt @@ -17,3 +17,4 @@ psutil matplotlib packaging numpydoc +dask