diff --git a/nipype/exceptions.py b/nipype/exceptions.py index f6352c78f0..e5025ee820 100644 --- a/nipype/exceptions.py +++ b/nipype/exceptions.py @@ -2,6 +2,10 @@ class NipypeError(Exception): pass +class EngineError(Exception): + pass + + class PipelineError(NipypeError): pass diff --git a/nipype/info.py b/nipype/info.py index a371b70e2e..c8e384f7b0 100644 --- a/nipype/info.py +++ b/nipype/info.py @@ -148,6 +148,7 @@ def get_nipype_gitversion(): 'pydot>=%s' % PYDOT_MIN_VERSION, 'packaging', 'futures; python_version == "2.7"', + 'dask', ] if sys.version_info <= (3, 4): diff --git a/nipype/pipeline/engine/__init__.py b/nipype/pipeline/engine/__init__.py index e950086307..2a8ca8850f 100644 --- a/nipype/pipeline/engine/__init__.py +++ b/nipype/pipeline/engine/__init__.py @@ -9,6 +9,6 @@ from __future__ import absolute_import __docformat__ = 'restructuredtext' -from .workflows import Workflow +from .workflows import Workflow, NewNode, NewWorkflow from .nodes import Node, MapNode, JoinNode from .utils import generate_expanded_graph diff --git a/nipype/pipeline/engine/auxiliary.py b/nipype/pipeline/engine/auxiliary.py new file mode 100644 index 0000000000..d86702bd7f --- /dev/null +++ b/nipype/pipeline/engine/auxiliary.py @@ -0,0 +1,268 @@ +import pdb +import inspect, os +from ... import config, logging +logger = logging.getLogger('nipype.workflow') +from .nodes import Node + + +# dj: might create a new class or move to State + +# Function to change user provided mapper to "reverse polish notation" used in State +def mapper2rpn(mapper, other_mappers=None): + """ Functions that translate mapper to "reverse polish notation.""" + output_mapper = [] + _ordering(mapper, i=0, output_mapper=output_mapper, other_mappers=other_mappers) + return output_mapper + + +def _ordering(el, i, output_mapper, current_sign=None, other_mappers=None): + """ Used in the mapper2rpn to get a proper order of fields and signs. """ + if type(el) is tuple: + # checking if the mapper dont contain mapper from previous nodes, i.e. has str "_NA", etc. + if type(el[0]) is str and el[0].startswith("_"): + node_nm = el[0][1:] + if node_nm not in other_mappers: + raise Exception("can't ask for mapper from {}".format(node_nm)) + mapper_mod = change_mapper(mapper=other_mappers[node_nm], name=node_nm) + el = (mapper_mod, el[1]) + if type(el[1]) is str and el[1].startswith("_"): + node_nm = el[1][1:] + if node_nm not in other_mappers: + raise Exception("can't ask for mapper from {}".format(node_nm)) + mapper_mod = change_mapper(mapper=other_mappers[node_nm], name=node_nm) + el = (el[0], mapper_mod) + _iterate_list(el, ".", other_mappers, output_mapper=output_mapper) + elif type(el) is list: + if type(el[0]) is str and el[0].startswith("_"): + node_nm = el[0][1:] + if node_nm not in other_mappers: + raise Exception("can't ask for mapper from {}".format(node_nm)) + mapper_mod = change_mapper(mapper=other_mappers[node_nm], name=node_nm) + el[0] = mapper_mod + if type(el[1]) is str and el[1].startswith("_"): + node_nm = el[1][1:] + if node_nm not in other_mappers: + raise Exception("can't ask for mapper from {}".format(node_nm)) + mapper_mod = change_mapper(mapper=other_mappers[node_nm], name=node_nm) + el[1] = mapper_mod + _iterate_list(el, "*", other_mappers, output_mapper=output_mapper) + elif type(el) is str: + output_mapper.append(el) + else: + raise Exception("mapper has to be a string, a tuple or a list") + + if i > 0: + output_mapper.append(current_sign) + + +def _iterate_list(element, sign, other_mappers, output_mapper): + """ Used in the mapper2rpn to get recursion. """ + for i, el in enumerate(element): + _ordering(el, i, current_sign=sign, other_mappers=other_mappers, output_mapper=output_mapper) + + +# functions used in State to know which element should be used for a specific axis + +def mapping_axis(state_inputs, mapper_rpn): + """Having inputs and mapper (in rpn notation), functions returns the axes of output for every input.""" + axis_for_input = {} + stack = [] + current_axis = None + current_shape = None + #pdb.set_trace() + for el in mapper_rpn: + if el == ".": + right = stack.pop() + left = stack.pop() + if left == "OUT": + if state_inputs[right].shape == current_shape: #todo:should we allow for one-element array? + axis_for_input[right] = current_axis + else: + raise Exception("arrays for scalar operations should have the same size") + + elif right == "OUT": + if state_inputs[left].shape == current_shape: + axis_for_input[left] = current_axis + else: + raise Exception("arrays for scalar operations should have the same size") + + else: + if state_inputs[right].shape == state_inputs[left].shape: + current_axis = list(range(state_inputs[right].ndim)) + current_shape = state_inputs[left].shape + axis_for_input[left] = current_axis + axis_for_input[right] = current_axis + else: + raise Exception("arrays for scalar operations should have the same size") + + stack.append("OUT") + + elif el == "*": + right = stack.pop() + left = stack.pop() + if left == "OUT": + axis_for_input[right] = [i + 1 + current_axis[-1] + for i in range(state_inputs[right].ndim)] + current_axis = current_axis + axis_for_input[right] + current_shape = tuple([i for i in current_shape + state_inputs[right].shape]) + elif right == "OUT": + for key in axis_for_input: + axis_for_input[key] = [i + state_inputs[left].ndim + for i in axis_for_input[key]] + + axis_for_input[left] = [i - len(current_shape) + current_axis[-1] + 1 + for i in range(state_inputs[left].ndim)] + current_axis = current_axis + [i + 1 + current_axis[-1] + for i in range(state_inputs[left].ndim)] + current_shape = tuple([i for i in state_inputs[left].shape + current_shape]) + else: + axis_for_input[left] = list(range(state_inputs[left].ndim)) + axis_for_input[right] = [i + state_inputs[left].ndim + for i in range(state_inputs[right].ndim)] + current_axis = axis_for_input[left] + axis_for_input[right] + current_shape = tuple([i for i in + state_inputs[left].shape + state_inputs[right].shape]) + stack.append("OUT") + + else: + stack.append(el) + + if len(stack) == 0: + pass + elif len(stack) > 1: + raise Exception("exception from mapping_axis") + elif stack[0] != "OUT": + current_axis = [i for i in range(state_inputs[stack[0]].ndim)] + axis_for_input[stack[0]] = current_axis + + if current_axis: + ndim = max(current_axis) + 1 + else: + ndim = 0 + return axis_for_input, ndim + + +def converting_axis2input(state_inputs, axis_for_input, ndim): + """ Having axes for all the input fields, the function returns fields for each axis. """ + input_for_axis = [] + shape = [] + for i in range(ndim): + input_for_axis.append([]) + shape.append(0) + + for inp, axis in axis_for_input.items(): + for (i, ax) in enumerate(axis): + input_for_axis[ax].append(inp) + shape[ax] = state_inputs[inp].shape[i] + + return input_for_axis, shape + + +# used in the Node to change names in a mapper + +def change_mapper(mapper, name): + """changing names of mapper: adding names of the node""" + if isinstance(mapper, str): + if "." in mapper or mapper.startswith("_"): + return mapper + else: + return "{}.{}".format(name, mapper) + elif isinstance(mapper, list): + return _add_name(mapper, name) + elif isinstance(mapper, tuple): + mapper_l = list(mapper) + return tuple(_add_name(mapper_l, name)) + + +def _add_name(mlist, name): + for i, elem in enumerate(mlist): + if isinstance(elem, str): + if "." in elem or elem.startswith("_"): + pass + else: + mlist[i] = "{}.{}".format(name, mlist[i]) + elif isinstance(elem, list): + mlist[i] = _add_name(elem, name) + elif isinstance(elem, tuple): + mlist[i] = list(elem) + mlist[i] = _add_name(mlist[i], name) + mlist[i] = tuple(mlist[i]) + return mlist + + +#Function interface + +class FunctionInterface(object): + """ A new function interface """ + def __init__(self, function, output_nm, out_read=False, input_map=None): + self.function = function + if type(output_nm) is list: + self._output_nm = output_nm + else: + raise Exception("output_nm should be a list") + if not input_map: + self.input_map = {} + # TODO use signature + for key in inspect.getargspec(function)[0]: + if key not in self.input_map.keys(): + self.input_map[key] = key + # flags if we want to read the txt file to save in node.output + self.out_read = out_read + + + def run(self, input): + self.output = {} + if self.input_map: + for (key_fun, key_inp) in self.input_map.items(): + try: + input[key_fun] = input.pop(key_inp) + except KeyError: + raise Exception("no {} in the input dictionary".format(key_inp)) + fun_output = self.function(**input) + logger.debug("Function Interf, input={}, fun_out={}".format(input, fun_output)) + if type(fun_output) is tuple: + if len(self._output_nm) == len(fun_output): + for i, out in enumerate(fun_output): + self.output[self._output_nm[i]] = out + else: + raise Exception("length of output_nm doesnt match length of the function output") + elif len(self._output_nm)==1: + self.output[self._output_nm[0]] = fun_output + else: + raise Exception("output_nm doesnt match length of the function output") + + return fun_output + + +# want to use to access input as dot, +# but it doesnt work since im using "." within names (using my old syntax with - also cant work) +# https://stackoverflow.com/questions/2352181/how-to-use-a-dot-to-access-members-of-dictionary +class DotDict(dict): + """dot.notation access to dictionary attributes""" + def __getattr__(self, attr): + return self.get(attr) + __setattr__= dict.__setitem__ + __delattr__= dict.__delitem__ + + def __getstate__(self): + return self + + def __setstate__(self, state): + self.update(state) + self.__dict__ = self + + +class CurrentInterface(object): + def __init__(self, interface, name): + self.nn = Node(interface=interface, name=name) + self.output = {} + + def run(self, inputs, base_dir, dir_nm_el): + self.nn.base_dir = os.path.join(base_dir, dir_nm_el) + for key, val in inputs.items(): + key = key.split(".")[-1] + setattr(self.nn.inputs, key, val) + #have to set again self._output_dir in case of mapper + self.nn._output_dir = os.path.join(self.nn.base_dir, self.nn.name) + res = self.nn.run() + return res \ No newline at end of file diff --git a/nipype/pipeline/engine/state.py b/nipype/pipeline/engine/state.py new file mode 100644 index 0000000000..354b8d80af --- /dev/null +++ b/nipype/pipeline/engine/state.py @@ -0,0 +1,117 @@ +from collections import OrderedDict +import itertools +import pdb + +from . import auxiliary as aux + +class State(object): + def __init__(self, node_name, mapper=None, other_mappers=None): + self._mapper = mapper + self.node_name = node_name + if self._mapper: + # changing mapper (as in rpn), so I can read from left to right + # e.g. if mapper=('d', ['e', 'r']), _mapper_rpn=['d', 'e', 'r', '*', '.'] + self._mapper_rpn = aux.mapper2rpn(self._mapper, other_mappers=other_mappers) + self._input_names_mapper = [i for i in self._mapper_rpn if i not in ["*", "."]] + else: + self._mapper_rpn = [] + self._input_names_mapper = [] + + + def prepare_state_input(self, state_inputs): + """prepare all inputs, should be called once all input is available""" + + # dj TOTHINK: I actually stopped using state_inputs for now, since people wanted to have mapper not only + # for state inputs. Might have to come back.... + self.state_inputs = state_inputs + + # not all input field have to be use in the mapper, can be an extra scalar + self._input_names = list(self.state_inputs.keys()) + + # dictionary[key=input names] = list of axes related to + # e.g. {'r': [1], 'e': [0], 'd': [0, 1]} + # ndim - int, number of dimension for the "final array" (that is not created) + self._axis_for_input, self._ndim = aux.mapping_axis(self.state_inputs, self._mapper_rpn) + + # list of inputs variable for each axis + # e.g. [['e', 'd'], ['r', 'd']] + # shape - list, e.g. [2,3] + self._input_for_axis, self._shape = aux.converting_axis2input(self.state_inputs, + self._axis_for_input, self._ndim) + + # list of all possible indexes in each dim, will be use to iterate + # e.g. [[0, 1], [0, 1, 2]] + self.all_elements = [range(i) for i in self._shape] + self.index_generator = itertools.product(*self.all_elements) + + + def __getitem__(self, ind): + if type(ind) is int: + ind = (ind,) + return self.state_values(ind) + + # not used? + #@property + #def mapper(self): + # return self._mapper + + + @property + def ndim(self): + return self._ndim + + + @property + def shape(self): + return self._shape + + + def state_values(self, ind): + """returns state input as a dictionary (input name, value)""" + if len(ind) > self._ndim: + raise IndexError("too many indices") + + for ii, index in enumerate(ind): + if index > self._shape[ii] - 1: + raise IndexError("index {} is out of bounds for axis {} with size {}".format(index, ii, self._shape[ii])) + + state_dict = {} + for input, ax in self._axis_for_input.items(): + # checking which axes are important for the input + sl_ax = slice(ax[0], ax[-1]+1) + # taking the indexes for the axes + ind_inp = tuple(ind[sl_ax]) #used to be list + state_dict[input] = self.state_inputs[input][ind_inp] + # adding values from input that are not used in the mapper + for input in set(self._input_names) - set(self._input_names_mapper): + state_dict[input] = self.state_inputs[input] + + # in py3.7 we can skip OrderedDict + # returning a named tuple? + return OrderedDict(sorted(state_dict.items(), key=lambda t: t[0])) + + + def state_ind(self, ind): + """similar to state value but returns indices (not values)""" + if len(ind) > self._ndim: + raise IndexError("too many indices") + + for ii, index in enumerate(ind): + if index > self._shape[ii] - 1: + raise IndexError("index {} is out of bounds for axis {} with size {}".format(index, ii, self._shape[ii])) + + state_dict = {} + for input, ax in self._axis_for_input.items(): + # checking which axes are important for the input + sl_ax = slice(ax[0], ax[-1]+1) + # taking the indexes for the axes + ind_inp = tuple(ind[sl_ax]) #used to be list + ind_inp_str = "x".join([str(el) for el in ind_inp]) + state_dict[input] = ind_inp_str + # adding inputs that are not used in the mapper + for input in set(self._input_names) - set(self._input_names_mapper): + state_dict[input] = None + + # in py3.7 we can skip OrderedDict + # returning a named tuple? + return OrderedDict(sorted(state_dict.items(), key=lambda t: t[0])) \ No newline at end of file diff --git a/nipype/pipeline/engine/submitter.py b/nipype/pipeline/engine/submitter.py new file mode 100644 index 0000000000..f24ab5a63b --- /dev/null +++ b/nipype/pipeline/engine/submitter.py @@ -0,0 +1,198 @@ +from __future__ import print_function, division, unicode_literals, absolute_import +from builtins import object + +from future import standard_library +standard_library.install_aliases() + +import os, pdb, time +from copy import deepcopy + +from .workers import MpWorker, SerialWorker, DaskWorker, ConcurrentFuturesWorker + +from ... import config, logging +logger = logging.getLogger('nipype.workflow') + + +class Submitter(object): + # TODO: runnable in init or run + def __init__(self, plugin, runnable): + self.plugin = plugin + self.node_line = [] + self._to_finish = [] # used only for wf + if self.plugin == "mp": + self.worker = MpWorker() + elif self.plugin == "serial": + self.worker = SerialWorker() + elif self.plugin == "dask": + self.worker = DaskWorker() + elif self.plugin == "cf": + self.worker = ConcurrentFuturesWorker() + else: + raise Exception("plugin {} not available".format(self.plugin)) + + if hasattr(runnable, 'interface'): # a node + self.node = runnable + elif hasattr(runnable, "graph"): # a workflow + self.workflow = runnable + else: + raise Exception("runnable has to be a Node or Workflow") + + + def run(self): + """main running method, checks if submitter id for Node or Workflow""" + if hasattr(self, "node"): + self.run_node() + elif hasattr(self, "workflow"): + self.run_workflow() + + + def run_node(self): + """the main method to run a Node""" + self.node.prepare_state_input() + self._submit_node(self.node) + while not self.node.is_complete: + logger.debug("Submitter, in while, to_finish: {}".format(self.node)) + time.sleep(3) + self.node.get_output() + + + def _submit_node(self, node): + """submitting nodes's interface for all states""" + for (i, ind) in enumerate(node.state.index_generator): + self._submit_node_el(node, i, ind) + + def _submit_node_el(self, node, i, ind): + """submitting node's interface for one element of states""" + logger.debug("SUBMIT WORKER, node: {}, ind: {}".format(node, ind)) + self.worker.run_el(node.run_interface_el, (i, ind)) + + + def run_workflow(self, workflow=None, ready=True): + """the main function to run Workflow""" + if not workflow: + workflow = self.workflow + workflow.prepare_state_input() + + # TODO: should I have inner_nodes for all workflow (to avoid if wf.mapper)?? + if workflow.mapper: + for key in workflow._node_names.keys(): + workflow.inner_nodes[key] = [] + for (i, ind) in enumerate(workflow.state.index_generator): + new_workflow = deepcopy(workflow) + new_workflow.parent_wf = workflow + if ready: + self._run_workflow_el(new_workflow, i, ind) + else: + self.node_line.append((new_workflow, i, ind)) + else: + if ready: + if workflow.print_val: + workflow.preparing(wf_inputs=workflow.inputs) + else: + inputs_ind = dict((key, None) for (key, _) in workflow.inputs.items()) + workflow.preparing(wf_inputs=workflow.inputs, wf_inputs_ind=inputs_ind) + self._run_workflow_nd(workflow=workflow) + else: + self.node_line.append((workflow, 0, ())) + + # this parts submits nodes that are waiting to be run + # it should stop when nothing is waiting + while self._nodes_check(): + logger.debug("Submitter, in while, node_line: {}".format(self.node_line)) + time.sleep(3) + + # this part simply waiting for all "last nodes" to finish + while self._output_check(): + logger.debug("Submitter, in while, to_finish: {}".format(self._to_finish)) + time.sleep(3) + + # calling only for the main wf (other wf will be called inside the function) + if workflow is self.workflow: + workflow.get_output() + + + def _run_workflow_el(self, workflow, i, ind, collect_inp=False): + """running one internal workflow (if workflow has a mapper)""" + # TODO: can I simplify and remove collect inp? where should it be? + if collect_inp: + st_inputs, wf_inputs = workflow.get_input_el(ind) + else: + wf_inputs = workflow.state.state_values(ind) + if workflow.print_val: + workflow.preparing(wf_inputs=wf_inputs) + else: + wf_inputs_ind = workflow.state.state_ind(ind) + workflow.preparing(wf_inputs=wf_inputs, wf_inputs_ind=wf_inputs_ind) + self._run_workflow_nd(workflow=workflow) + + + def _run_workflow_nd(self, workflow): + """iterating over all nodes from a workflow and submitting them or adding to the node_line""" + for (i_n, node) in enumerate(workflow.graph_sorted): + if workflow.parent_wf and workflow.parent_wf.mapper: # for now if parent_wf, parent_wf has to have mapper + workflow.parent_wf.inner_nodes[node.name].append(node) + node.prepare_state_input() + self._to_finish.append(node) + # submitting all the nodes who are self sufficient (self.workflow.graph is already sorted) + if node.ready2run: + if hasattr(node, 'interface'): + self._submit_node(node) + else: # it's workflow + self.run_workflow(workflow=node) + # if its not, its been added to a line + else: + break + # in case there is no element in the graph that goes to the break + # i want to be sure that not calculating the last node again in the next for loop + if i_n == len(workflow.graph_sorted) - 1: + i_n += 1 + + # all nodes that are not self sufficient (not ready to run) will go to the line + # iterating over all elements + for nn in list(workflow.graph_sorted)[i_n:]: + if hasattr(nn, 'interface'): + for (i, ind) in enumerate(nn.state.index_generator): + self._to_finish.append(nn) + self.node_line.append((nn, i, ind)) + else: #wf + self.run_workflow(workflow=nn, ready=False) + + + def _nodes_check(self): + """checking which nodes-states are ready to run and running the ones that are ready""" + _to_remove = [] + for (to_node, i, ind) in self.node_line: + if hasattr(to_node, 'interface'): + print("_NODES_CHECK INPUT", to_node.name, to_node.checking_input_el(ind)) + if to_node.checking_input_el(ind): + self._submit_node_el(to_node, i, ind) + _to_remove.append((to_node, i, ind)) + else: + pass + else: #wf + if to_node.checking_input_el(ind): + self._run_workflow_el(workflow=to_node, i=i, ind=ind, collect_inp=True) + _to_remove.append((to_node, i, ind)) + else: + pass + + for rn in _to_remove: + self.node_line.remove(rn) + return self.node_line + + + # this I believe can be done for entire node + def _output_check(self): + """"checking if all nodes are done""" + _to_remove = [] + for node in self._to_finish: + print("_output check node", node, node.name, node.is_complete) + if node.is_complete: + _to_remove.append(node) + for rn in _to_remove: + self._to_finish.remove(rn) + return self._to_finish + + + def close(self): + self.worker.close() diff --git a/nipype/pipeline/engine/tests/test_auxiliary.py b/nipype/pipeline/engine/tests/test_auxiliary.py new file mode 100644 index 0000000000..4c5df1df2f --- /dev/null +++ b/nipype/pipeline/engine/tests/test_auxiliary.py @@ -0,0 +1,84 @@ +from .. import auxiliary as aux + +import numpy as np +import pytest + +@pytest.mark.parametrize("mapper, rpn", + [ + ("a", ["a"]), + (("a", "b"), ["a", "b", "."]), + (["a", "b"], ["a", "b", "*"]), + (["a", ("b", "c")], ["a", "b", "c", ".", "*"]), + ([("a", "b"), "c"], ["a", "b", ".", "c", "*"]), + (["a", ("b", ["c", "d"])], ["a", "b", "c", "d", "*", ".", "*"]) + ]) +def test_mapper2rpn(mapper, rpn): + assert aux.mapper2rpn(mapper) == rpn + + +@pytest.mark.parametrize("mapper, other_mappers, rpn", + [ + (["a", "_NA"], {"NA": ("b", "c")}, ["a", "NA.b", "NA.c", ".", "*"]), + (["_NA", "c"], {"NA": ("a", "b")}, ["NA.a", "NA.b", ".", "c", "*"]), + (["a", ("b", "_NA")], {"NA": ["c", "d"]}, ["a", "b", "NA.c", "NA.d", "*", ".", "*"]) + ]) + +def test_mapper2rpn_wf_mapper(mapper, other_mappers, rpn): + assert aux.mapper2rpn(mapper, other_mappers=other_mappers) == rpn + + +@pytest.mark.parametrize("mapper, mapper_changed", + [ + ("a", "Node.a"), + (["a", ("b", "c")], ["Node.a", ("Node.b", "Node.c")]), + (("a", ["b", "c"]), ("Node.a", ["Node.b", "Node.c"])) + ]) +def test_change_mapper(mapper, mapper_changed): + assert aux.change_mapper(mapper, "Node") == mapper_changed + + +@pytest.mark.parametrize("inputs, rpn, expected", + [ + ({"a": np.array([1, 2])}, ["a"], {"a": [0]}), + ({"a": np.array([1, 2]), "b": np.array([3, 4])}, ["a", "b", "."], {"a": [0], "b": [0]}), + ({"a": np.array([1, 2]), "b": np.array([3, 4, 1])}, ["a", "b", "*"], {"a": [0], "b": [1]}), + ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, ["a", "b", ".", "c", "*"], + {"a": [0], "b": [0], "c": [1]}), + ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, + ["c", "a", "b", ".", "*"], {"a": [1], "b": [1], "c": [0]}), + ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), "c": np.array([1, 2, 3])}, + ["a", "b", ".", "c", "*"], {"a": [0, 1], "b": [0, 1], "c": [2]}), + ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), + "c": np.array([1, 2, 3])}, ["c", "a", "b", ".", "*"], {"a": [1, 2], "b": [1, 2], "c": [0]}) + ]) +def test_mapping_axis(inputs, rpn, expected): + res = aux.mapping_axis(inputs, rpn)[0] + print(res) + for key in inputs.keys(): + assert res[key] == expected[key] + + +def test_mapping_axis_error(): + with pytest.raises(Exception): + aux.mapping_axis({"a": np.array([1, 2]), "b": np.array([3, 4, 5])}, ["a", "b", "."]) + + +@pytest.mark.parametrize("inputs, axis_inputs, ndim, expected", + [ + ({"a": np.array([1, 2])}, {"a": [0]}, 1, [["a"]]), + ({"a": np.array([1, 2]), "b": np.array([3, 4])}, {"a": [0], "b": [0]}, 1, + [["a", "b"]]), + ({"a": np.array([1, 2]), "b": np.array([3, 4, 1])}, {"a": [0], "b": [1]}, 2, + [["a"], ["b"]]), + ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, + {"a": [0], "b": [0], "c": [1]}, 2, [["a", "b"]]), + ({"a": np.array([1, 2]), "b": np.array([3, 4]), "c": np.array([1, 2, 3])}, + {"a": [1], "b": [1], "c": [0]}, 2, [["c"], ["a", "b"]]), + ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), "c": np.array([1, 2, 3])}, + {"a": [0, 1], "b": [0, 1], "c": [2]}, 3, [["a", "b"], ["a", "b"], ["c"]]), + ({"a": np.array([[1, 2], [1, 2]]), "b": np.array([[3, 4], [3, 3]]), + "c": np.array([1, 2, 3])}, {"a": [1, 2], "b": [1, 2], "c": [0]}, 3, + [["c"], ["a", "b"], ["a", "b"]]) + ]) +def test_converting_axis2input(inputs, axis_inputs, ndim, expected): + aux.converting_axis2input(inputs, axis_inputs, ndim)[0] == expected diff --git a/nipype/pipeline/engine/tests/test_newnode.py b/nipype/pipeline/engine/tests/test_newnode.py new file mode 100644 index 0000000000..9bee85c5ac --- /dev/null +++ b/nipype/pipeline/engine/tests/test_newnode.py @@ -0,0 +1,1460 @@ +from ....utils.filemanip import save_json, makedirs, to_str +from ....interfaces import fsl + +from .. import NewNode, NewWorkflow +from ..auxiliary import FunctionInterface, CurrentInterface +from ..submitter import Submitter + +import sys, time, os +import numpy as np +import pytest, pdb + +python35_only = pytest.mark.skipif(sys.version_info < (3, 5), + reason="requires Python>3.4") + +@pytest.fixture(scope="module") +def change_dir(request): + orig_dir = os.getcwd() + test_dir = os.path.join(orig_dir, "test_outputs") + makedirs(test_dir, exist_ok=True) + os.chdir(test_dir) + + def move2orig(): + os.chdir(orig_dir) + + request.addfinalizer(move2orig) + + +Plugins = ["serial"] +Plugins = ["serial", "mp", "cf", "dask"] + +def fun_addtwo(a): + time.sleep(1) + if a == 3: + time.sleep(2) + return a + 2 + +def fun_addvar(a, b): + return a + b + + +def test_node_1(): + """Node with mandatory arguments only""" + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + nn = NewNode(name="NA", interface=interf_addtwo) + assert nn.mapper is None + assert nn.inputs == {} + assert nn.state._mapper is None + + +def test_node_2(): + """Node with interface and inputs""" + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + nn = NewNode(name="NA", interface=interf_addtwo, inputs={"a": 3}) + assert nn.mapper is None + # adding NA to the name of the variable + assert nn.inputs == {"NA.a": 3} + assert nn.state._mapper is None + + +def test_node_3(): + """Node with interface, inputs and mapper""" + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + nn = NewNode(name="NA", interface=interf_addtwo, inputs={"a": [3, 5]}, mapper="a") + assert nn.mapper == "NA.a" + assert (nn.inputs["NA.a"] == np.array([3, 5])).all() + + assert nn.state._mapper == "NA.a" + + nn.prepare_state_input() + assert nn.state.state_values([0]) == {"NA.a": 3} + assert nn.state.state_values([1]) == {"NA.a": 5} + + +def test_node_4(): + """Node with interface and inputs. mapper set using map method""" + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + nn = NewNode(name="NA", interface=interf_addtwo, inputs={"a": [3, 5]}) + nn.map(mapper="a") + assert nn.mapper == "NA.a" + assert (nn.inputs["NA.a"] == np.array([3, 5])).all() + + nn.prepare_state_input() + assert nn.state._mapper == "NA.a" + assert nn.state.state_values([0]) == {"NA.a": 3} + assert nn.state.state_values([1]) == {"NA.a": 5} + + +def test_node_4a(): + """Node with interface, mapper and inputs set with the map method""" + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + nn = NewNode(name="NA", interface=interf_addtwo) + nn.map(mapper="a", inputs={"a": [3, 5]}) + assert nn.mapper == "NA.a" + assert (nn.inputs["NA.a"] == np.array([3, 5])).all() + + assert nn.state._mapper == "NA.a" + nn.prepare_state_input() + assert nn.state.state_values([0]) == {"NA.a": 3} + assert nn.state.state_values([1]) == {"NA.a": 5} + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_node_5(plugin, change_dir): + """Node with interface and inputs, no mapper, running interface""" + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + nn = NewNode(name="NA", inputs={"a": 3}, interface=interf_addtwo, + workingdir="test_nd5_{}".format(plugin)) + + assert (nn.inputs["NA.a"] == np.array([3])).all() + + sub = Submitter(plugin=plugin, runnable=nn) + sub.run() + sub.close() + + # checking the results + expected = [({"NA.a": 3}, 5)] + # to be sure that there is the same order (not sure if node itself should keep the order) + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + nn.result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + + for i, res in enumerate(expected): + assert nn.result["out"][i][0] == res[0] + assert nn.result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_node_6(plugin, change_dir): + """Node with interface, inputs and the simplest mapper, running interface""" + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + nn = NewNode(name="NA", interface=interf_addtwo, workingdir="test_nd6_{}".format(plugin)) + nn.map(mapper="a", inputs={"a": [3, 5]}) + + assert nn.mapper == "NA.a" + assert (nn.inputs["NA.a"] == np.array([3, 5])).all() + + sub = Submitter(plugin=plugin, runnable=nn) + sub.run() + sub.close() + + # checking the results + expected = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + # to be sure that there is the same order (not sure if node itself should keep the order) + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + nn.result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + + for i, res in enumerate(expected): + assert nn.result["out"][i][0] == res[0] + assert nn.result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_node_7(plugin, change_dir): + """Node with interface, inputs and scalar mapper, running interface""" + interf_addvar = FunctionInterface(fun_addvar, ["out"]) + nn = NewNode(name="NA", interface=interf_addvar, workingdir="test_nd7_{}".format(plugin)) + # scalar mapper + nn.map(mapper=("a", "b"), inputs={"a": [3, 5], "b": [2, 1]}) + + assert nn.mapper == ("NA.a", "NA.b") + assert (nn.inputs["NA.a"] == np.array([3, 5])).all() + assert (nn.inputs["NA.b"] == np.array([2, 1])).all() + + sub = Submitter(plugin=plugin, runnable=nn) + sub.run() + sub.close() + + # checking the results + expected = [({"NA.a": 3, "NA.b": 2}, 5), ({"NA.a": 5, "NA.b": 1}, 6)] + # to be sure that there is the same order (not sure if node itself should keep the order) + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + nn.result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + + for i, res in enumerate(expected): + assert nn.result["out"][i][0] == res[0] + assert nn.result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_node_8(plugin, change_dir): + """Node with interface, inputs and vector mapper, running interface""" + interf_addvar = FunctionInterface(fun_addvar, ["out"]) + nn = NewNode(name="NA", interface=interf_addvar, workingdir="test_nd8_{}".format(plugin)) + # [] for outer product + nn.map(mapper=["a", "b"], inputs={"a": [3, 5], "b": [2, 1]}) + + assert nn.mapper == ["NA.a", "NA.b"] + assert (nn.inputs["NA.a"] == np.array([3, 5])).all() + assert (nn.inputs["NA.b"] == np.array([2, 1])).all() + + sub = Submitter(plugin=plugin, runnable=nn) + sub.run() + sub.close() + + # checking teh results + expected = [({"NA.a": 3, "NA.b": 1}, 4), ({"NA.a": 3, "NA.b": 2}, 5), + ({"NA.a": 5, "NA.b": 1}, 6), ({"NA.a": 5, "NA.b": 2}, 7)] + # to be sure that there is the same order (not sure if node itself should keep the order) + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + nn.result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert nn.result["out"][i][0] == res[0] + assert nn.result["out"][i][1] == res[1] + + +# tests for workflows + +@python35_only +def test_workflow_0(plugin="serial"): + """workflow (without run) with one node with a mapper""" + wf = NewWorkflow(name="wf0", workingdir="test_wf0_{}".format(plugin)) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + # defining a node with mapper and inputs first + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + na.map(mapper="a", inputs={"a": [3, 5]}) + # one of the way of adding nodes to the workflow + wf.add_nodes([na]) + assert wf.nodes[0].mapper == "NA.a" + assert (wf.nodes[0].inputs['NA.a'] == np.array([3, 5])).all() + assert len(wf.graph.nodes) == 1 + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_1(plugin, change_dir): + """workflow with one node with a mapper""" + wf = NewWorkflow(name="wf1", workingdir="test_wf1_{}".format(plugin)) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + na.map(mapper="a", inputs={"a": [3, 5]}) + wf.add_nodes([na]) + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + expected = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert wf.nodes[0].result["out"][i][0] == res[0] + assert wf.nodes[0].result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_2(plugin, change_dir): + """workflow with two nodes, second node without mapper""" + wf = NewWorkflow(name="wf2", workingdir="test_wf2_{}".format(plugin)) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + na.map(mapper="a", inputs={"a": [3, 5]}) + + # the second node does not have explicit mapper (but keeps the mapper from the NA node) + interf_addvar = FunctionInterface(fun_addvar, ["out"]) + nb = NewNode(name="NB", interface=interf_addvar, inputs={"b": 10}, workingdir="nb") + + # adding 2 nodes and create a connection (as it is now) + wf.add_nodes([na, nb]) + wf.connect("NA", "out", "NB", "a") + assert wf.nodes[0].mapper == "NA.a" + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + expected_A = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + key_sort = list(expected_A[0][0].keys()) + expected_A.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_A): + assert wf.nodes[0].result["out"][i][0] == res[0] + assert wf.nodes[0].result["out"][i][1] == res[1] + + # results from NB keeps the "state input" from the first node + # two elements as in NA + expected_B = [({"NA.a": 3, "NB.b": 10}, 15), ({"NA.a": 5, "NB.b": 10}, 17)] + key_sort = list(expected_B[0][0].keys()) + expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_B): + assert wf.nodes[1].result["out"][i][0] == res[0] + assert wf.nodes[1].result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_2a(plugin, change_dir): + """workflow with two nodes, second node with a scalar mapper""" + wf = NewWorkflow(name="wf2", workingdir="test_wf2a_{}".format(plugin)) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + na.map(mapper="a", inputs={"a": [3, 5]}) + + interf_addvar = FunctionInterface(fun_addvar, ["out"]) + nb = NewNode(name="NB", interface=interf_addvar, workingdir="nb") + # explicit scalar mapper between "a" from NA and b + nb.map(mapper=("NA.a", "b"), inputs={"b": [2, 1]}) + + wf.add_nodes([na, nb]) + wf.connect("NA", "out", "NB", "a") + + assert wf.nodes[0].mapper == "NA.a" + assert wf.nodes[1].mapper == ("NA.a", "NB.b") + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + expected_A = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + key_sort = list(expected_A[0][0].keys()) + expected_A.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_A): + assert wf.nodes[0].result["out"][i][0] == res[0] + assert wf.nodes[0].result["out"][i][1] == res[1] + + # two elements (scalar mapper) + expected_B = [({"NA.a": 3, "NB.b": 2}, 7), ({"NA.a": 5, "NB.b": 1}, 8)] + key_sort = list(expected_B[0][0].keys()) + expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_B): + assert wf.nodes[1].result["out"][i][0] == res[0] + assert wf.nodes[1].result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_2b(plugin): + """workflow with two nodes, second node with a vector mapper""" + wf = NewWorkflow(name="wf2", workingdir="test_wf2b_{}".format(plugin)) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + na.map(mapper="a", inputs={"a": [3, 5]}) + + interf_addvar = FunctionInterface(fun_addvar, ["out"]) + nb = NewNode(name="NB", interface=interf_addvar, workingdir="nb") + # outer mapper + nb.map(mapper=["NA.a", "b"], inputs={"b": [2, 1]}) + + wf.add_nodes([na, nb]) + wf.connect("NA", "out", "NB", "a") + + assert wf.nodes[0].mapper == "NA.a" + assert wf.nodes[1].mapper == ["NA.a", "NB.b"] + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + expected_A = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + key_sort = list(expected_A[0][0].keys()) + expected_A.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_A): + assert wf.nodes[0].result["out"][i][0] == res[0] + assert wf.nodes[0].result["out"][i][1] == res[1] + + # four elements (outer product) + expected_B = [({"NA.a": 3, "NB.b": 1}, 6), ({"NA.a": 3, "NB.b": 2}, 7), + ({"NA.a": 5, "NB.b": 1}, 8), ({"NA.a": 5, "NB.b": 2}, 9)] + key_sort = list(expected_B[0][0].keys()) + expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_B): + assert wf.nodes[1].result["out"][i][0] == res[0] + assert wf.nodes[1].result["out"][i][1] == res[1] + + +# using add method to add nodes + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_3(plugin, change_dir): + """using add(node) method""" + wf = NewWorkflow(name="wf3", workingdir="test_wf3_{}".format(plugin)) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + na.map(mapper="a", inputs={"a": [3, 5]}) + # using add method (as in the Satra's example) with a node + wf.add(na) + + assert wf.nodes[0].mapper == "NA.a" + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + expected = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert wf.nodes[0].result["out"][i][0] == res[0] + assert wf.nodes[0].result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_3a(plugin, change_dir): + """using add(interface) method""" + wf = NewWorkflow(name="wf3a", workingdir="test_wf3a_{}".format(plugin)) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + + # using the add method with an interface + wf.add(interf_addtwo, workingdir="na", mapper="a", inputs={"a": [3, 5]}, name="NA") + + assert wf.nodes[0].mapper == "NA.a" + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + expected = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert wf.nodes[0].result["out"][i][0] == res[0] + assert wf.nodes[0].result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_3b(plugin, change_dir): + """using add (function) method""" + wf = NewWorkflow(name="wf3b", workingdir="test_wf3b_{}".format(plugin)) + # using the add method with a function + wf.add(fun_addtwo, workingdir="na", mapper="a", inputs={"a": [3, 5]}, name="NA") + + assert wf.nodes[0].mapper == "NA.a" + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + expected = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert wf.nodes[0].result["out"][i][0] == res[0] + assert wf.nodes[0].result["out"][i][1] == res[1] + + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_4(plugin, change_dir): + """ using add(node) method + using wf.connect to connect two nodes + """ + wf = NewWorkflow(name="wf4", workingdir="test_wf4_{}".format(plugin)) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + na.map(mapper="a", inputs={"a": [3, 5]}) + wf.add(na) + + interf_addvar = FunctionInterface(fun_addvar, ["out"]) + nb = NewNode(name="NB", interface=interf_addvar, workingdir="nb") + # explicit mapper with a variable from the previous node + # providing inputs with b + nb.map(mapper=("NA.a", "b"), inputs={"b": [2, 1]}) + wf.add(nb) + # connect method as it is in the current version + wf.connect("NA", "out", "NB", "a") + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + expected = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert wf.nodes[0].result["out"][i][0] == res[0] + assert wf.nodes[0].result["out"][i][1] == res[1] + + expected_B = [({"NA.a": 3, "NB.b": 2}, 7), ({"NA.a": 5, "NB.b": 1}, 8)] + key_sort = list(expected_B[0][0].keys()) + expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_B): + assert wf.nodes[1].result["out"][i][0] == res[0] + assert wf.nodes[1].result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_4a(plugin, change_dir): + """ using add(node) method with kwarg arg to connect nodes (instead of wf.connect) """ + wf = NewWorkflow(name="wf4a", workingdir="test_wf4a_{}".format(plugin)) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + na.map(mapper="a", inputs={"a": [3, 5]}) + wf.add(na) + + interf_addvar = FunctionInterface(fun_addvar, ["out"]) + nb = NewNode(name="NB", interface=interf_addvar, workingdir="nb") + # explicit mapper with a variable from the previous node + nb.map(mapper=("NA.a", "b"), inputs={"b": [2, 1]}) + # instead of "connect", using kwrg argument in the add method as in the example + wf.add(nb, a="NA.out") + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + expected = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert wf.nodes[0].result["out"][i][0] == res[0] + assert wf.nodes[0].result["out"][i][1] == res[1] + + expected_B = [({"NA.a": 3, "NB.b": 2}, 7), ({"NA.a": 5, "NB.b": 1}, 8)] + key_sort = list(expected_B[0][0].keys()) + expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_B): + assert wf.nodes[1].result["out"][i][0] == res[0] + assert wf.nodes[1].result["out"][i][1] == res[1] + + + +# using map after add method + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_5(plugin, change_dir): + """using a map method for one node""" + wf = NewWorkflow(name="wf5", workingdir="test_wf5_{}".format(plugin)) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + + wf.add(na) + # using the map method after add (using mapper for the last added node as default) + wf.map_node(mapper="a", inputs={"a": [3, 5]}) + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + expected = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert wf.nodes[0].result["out"][i][0] == res[0] + assert wf.nodes[0].result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_5a(plugin, change_dir): + """using a map method for one node (using add and map in one chain)""" + wf = NewWorkflow(name="wf5a", workingdir="test_wf5a_{}".format(plugin)) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + + wf.add(na).map_node(mapper="a", inputs={"a": [3, 5]}) + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + expected = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert wf.nodes[0].result["out"][i][0] == res[0] + assert wf.nodes[0].result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_6(plugin, change_dir): + """using a map method for two nodes (using last added node as default)""" + wf = NewWorkflow(name="wf6", workingdir="test_wf6_{}".format(plugin)) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + + interf_addvar = FunctionInterface(fun_addvar, ["out"]) + nb = NewNode(name="NB", interface=interf_addvar, workingdir="nb") + # using the map methods after add (using mapper for the last added nodes as default) + wf.add(na) + wf.map_node(mapper="a", inputs={"a": [3, 5]}) + wf.add(nb) + wf.map_node(mapper=("NA.a", "b"), inputs={"b": [2, 1]}) + wf.connect("NA", "out", "NB", "a") + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + expected = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert wf.nodes[0].result["out"][i][0] == res[0] + assert wf.nodes[0].result["out"][i][1] == res[1] + + expected_B = [({"NA.a": 3, "NB.b": 2}, 7), ({"NA.a": 5, "NB.b": 1}, 8)] + key_sort = list(expected_B[0][0].keys()) + expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_B): + assert wf.nodes[1].result["out"][i][0] == res[0] + assert wf.nodes[1].result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_6a(plugin, change_dir): + """using a map method for two nodes (specifying the node)""" + wf = NewWorkflow(name="wf6a", workingdir="test_wf6a_{}".format(plugin)) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + + interf_addvar = FunctionInterface(fun_addvar, ["out"]) + nb = NewNode(name="NB", interface=interf_addvar, workingdir="nb") + # using the map method after add (specifying the node) + wf.add(na) + wf.add(nb) + wf.map_node(mapper="a", inputs={"a": [3, 5]}, node=na) + # TODO: should we se ("a", "c") instead?? shold I forget "NA.a" value? + wf.map_node(mapper=("NA.a", "b"), inputs={"b": [2, 1]}, node=nb) + wf.connect("NA", "out", "NB", "a") + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + expected = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert wf.nodes[0].result["out"][i][0] == res[0] + assert wf.nodes[0].result["out"][i][1] == res[1] + + expected_B = [({"NA.a": 3, "NB.b": 2}, 7), ({"NA.a": 5, "NB.b": 1}, 8)] + key_sort = list(expected_B[0][0].keys()) + expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_B): + assert wf.nodes[1].result["out"][i][0] == res[0] + assert wf.nodes[1].result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_6b(plugin, change_dir): + """using a map method for two nodes (specifying the node), using kwarg arg instead of connect""" + wf = NewWorkflow(name="wf6b", workingdir="test_wf6b_{}".format(plugin)) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + + interf_addvar = FunctionInterface(fun_addvar, ["out"]) + nb = NewNode(name="NB", interface=interf_addvar, workingdir="nb") + + wf.add(na) + wf.add(nb, a="NA.out") + wf.map_node(mapper="a", inputs={"a": [3, 5]}, node=na) + wf.map_node(mapper=("NA.a", "b"), inputs={"b": [2, 1]}, node=nb) + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + expected = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert wf.nodes[0].result["out"][i][0] == res[0] + assert wf.nodes[0].result["out"][i][1] == res[1] + + expected_B = [({"NA.a": 3, "NB.b": 2}, 7), ({"NA.a": 5, "NB.b": 1}, 8)] + key_sort = list(expected_B[0][0].keys()) + expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_B): + assert wf.nodes[1].result["out"][i][0] == res[0] + assert wf.nodes[1].result["out"][i][1] == res[1] + + +# tests for a workflow that have its own input + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_7(plugin, change_dir): + """using inputs for workflow and connect_workflow""" + # adding inputs to the workflow directly + wf = NewWorkflow(name="wf7", inputs={"wfa": [3, 5]}, workingdir="test_wf7_{}".format(plugin)) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + + wf.add(na) + # connecting the node with inputs from the workflow + wf.connect_wf_input("wfa", "NA", "a") + wf.map_node(mapper="a") + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + expected = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert wf.nodes[0].result["out"][i][0] == res[0] + assert wf.nodes[0].result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_7a(plugin, change_dir): + """using inputs for workflow and kwarg arg in add (instead of connect)""" + wf = NewWorkflow(name="wf7a", inputs={"wfa": [3, 5]}, workingdir="test_wf7a_{}".format(plugin)) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + # using kwrg argument in the add method (instead of connect or connect_wf_input + wf.add(na, a="wfa") + wf.map_node(mapper="a") + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + expected = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert wf.nodes[0].result["out"][i][0] == res[0] + assert wf.nodes[0].result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_8(plugin, change_dir): + """using inputs for workflow and connect_wf_input for the second node""" + wf = NewWorkflow(name="wf8", workingdir="test_wf8_{}".format(plugin), inputs={"b": 10}) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + na.map(mapper="a", inputs={"a": [3, 5]}) + + interf_addvar = FunctionInterface(fun_addvar, ["out"]) + nb = NewNode(name="NB", interface=interf_addvar, workingdir="nb") + + wf.add_nodes([na, nb]) + wf.connect("NA", "out", "NB", "a") + wf.connect_wf_input("b", "NB", "b") + assert wf.nodes[0].mapper == "NA.a" + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + expected_A = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + key_sort = list(expected_A[0][0].keys()) + expected_A.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_A): + assert wf.nodes[0].result["out"][i][0] == res[0] + assert wf.nodes[0].result["out"][i][1] == res[1] + + + expected_B = [({"NA.a": 3, "NB.b": 10}, 15), ({"NA.a": 5, "NB.b": 10}, 17)] + key_sort = list(expected_B[0][0].keys()) + expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_B): + assert wf.nodes[1].result["out"][i][0] == res[0] + assert wf.nodes[1].result["out"][i][1] == res[1] + + +# testing if _NA in mapper works, using interfaces in add + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_9(plugin, change_dir): + """using add(interface) method and mapper from previous nodes""" + wf = NewWorkflow(name="wf9", workingdir="test_wf9_{}".format(plugin)) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + wf.add(name="NA", runnable=interf_addtwo, workingdir="na").map_node(mapper="a", inputs={"a": [3, 5]}) + interf_addvar = FunctionInterface(fun_addvar, ["out"]) + # _NA means that I'm using mapper from the NA node, it's the same as ("NA.a", "b") + wf.add(name="NB", runnable=interf_addvar, workingdir="nb", a="NA.out").map_node(mapper=("_NA", "b"), inputs={"b": [2, 1]}) + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + expected = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert wf.nodes[0].result["out"][i][0] == res[0] + assert wf.nodes[0].result["out"][i][1] == res[1] + + expected_B = [({"NA.a": 3, "NB.b": 2}, 7), ({"NA.a": 5, "NB.b": 1}, 8)] + key_sort = list(expected_B[0][0].keys()) + expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_B): + assert wf.nodes[1].result["out"][i][0] == res[0] + assert wf.nodes[1].result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_10(plugin, change_dir): + """using add(interface) method and scalar mapper from previous nodes""" + wf = NewWorkflow(name="wf10", workingdir="test_wf10_{}".format(plugin)) + interf_addvar1 = FunctionInterface(fun_addvar, ["out"]) + wf.add(name="NA", runnable=interf_addvar1, workingdir="na").map_node(mapper=("a", "b"), inputs={"a": [3, 5], "b": [0, 10]}) + interf_addvar2 = FunctionInterface(fun_addvar, ["out"]) + # _NA means that I'm using mapper from the NA node, it's the same as (("NA.a", NA.b), "b") + wf.add(name="NB", runnable=interf_addvar2, workingdir="nb", a="NA.out").map_node(mapper=("_NA", "b"), inputs={"b": [2, 1]}) + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + expected = [({"NA.a": 3, "NA.b": 0}, 3), ({"NA.a": 5, "NA.b": 10}, 15)] + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert wf.nodes[0].result["out"][i][0] == res[0] + assert wf.nodes[0].result["out"][i][1] == res[1] + + expected_B = [({"NA.a": 3, "NA.b": 0, "NB.b": 2}, 5), ({"NA.a": 5, "NA.b": 10, "NB.b": 1}, 16)] + key_sort = list(expected_B[0][0].keys()) + expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_B): + assert wf.nodes[1].result["out"][i][0] == res[0] + assert wf.nodes[1].result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_10a(plugin, change_dir): + """using add(interface) method and vector mapper from previous nodes""" + wf = NewWorkflow(name="wf10a", workingdir="test_wf10a_{}".format(plugin)) + interf_addvar1 = FunctionInterface(fun_addvar, ["out"]) + wf.add(name="NA", runnable=interf_addvar1, workingdir="na").map_node(mapper=["a", "b"], inputs={"a": [3, 5], "b": [0, 10]}) + interf_addvar2 = FunctionInterface(fun_addvar, ["out"]) + # _NA means that I'm using mapper from the NA node, it's the same as (["NA.a", NA.b], "b") + wf.add(name="NB", runnable=interf_addvar2, workingdir="nb", a="NA.out").map_node(mapper=("_NA", "b"), inputs={"b": [[2, 1], [0, 0]]}) + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + expected = [({"NA.a": 3, "NA.b": 0}, 3), ({"NA.a": 3, "NA.b": 10}, 13), + ({"NA.a": 5, "NA.b": 0}, 5), ({"NA.a": 5, "NA.b": 10}, 15)] + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert wf.nodes[0].result["out"][i][0] == res[0] + assert wf.nodes[0].result["out"][i][1] == res[1] + + expected_B = [({"NA.a": 3, "NA.b": 0, "NB.b": 2}, 5), ({"NA.a": 3, "NA.b": 10, "NB.b": 1}, 14), + ({"NA.a": 5, "NA.b": 0, "NB.b": 0}, 5), ({"NA.a": 5, "NA.b": 10, "NB.b": 0}, 15)] + key_sort = list(expected_B[0][0].keys()) + expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[1].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_B): + assert wf.nodes[1].result["out"][i][0] == res[0] + assert wf.nodes[1].result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_11(plugin, change_dir): + """using add(interface) method and vector mapper from previous two nodes""" + wf = NewWorkflow(name="wf11", workingdir="test_wf11_{}".format(plugin)) + interf_addvar1 = FunctionInterface(fun_addvar, ["out"]) + wf.add(name="NA", runnable=interf_addvar1, workingdir="na").map_node(mapper=("a", "b"), inputs={"a": [3, 5], "b": [0, 10]}) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + wf.add(name="NB", runnable=interf_addtwo, workingdir="nb").map_node(mapper="a", inputs={"a": [2, 1]}) + interf_addvar2 = FunctionInterface(fun_addvar, ["out"]) + # _NA, _NB means that I'm using mappers from the NA/NB nodes, it's the same as [("NA.a", NA.b), "NB.a"] + wf.add(name="NC", runnable=interf_addvar2, workingdir="nc", a="NA.out", b="NB.out").map_node(mapper=["_NA", "_NB"]) # TODO: this should eb default? + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + expected = [({"NA.a": 3, "NA.b": 0}, 3), ({"NA.a": 5, "NA.b": 10}, 15)] + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[0].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected): + assert wf.nodes[0].result["out"][i][0] == res[0] + assert wf.nodes[0].result["out"][i][1] == res[1] + + + expected_C = [({"NA.a": 3, "NA.b": 0, "NB.a": 1}, 6), ({"NA.a": 3, "NA.b": 0, "NB.a": 2}, 7), + ({"NA.a": 5, "NA.b": 10, "NB.a": 1}, 18), ({"NA.a": 5, "NA.b": 10, "NB.a": 2}, 19)] + key_sort = list(expected_C[0][0].keys()) + expected_C.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.nodes[2].result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_C): + assert wf.nodes[2].result["out"][i][0] == res[0] + assert wf.nodes[2].result["out"][i][1] == res[1] + + +# checking workflow.result + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_12(plugin, change_dir): + """testing if wf.result works (the same workflow as in test_workflow_6)""" + wf = NewWorkflow(name="wf12", workingdir="test_wf12_{}".format(plugin), + wf_output_names=[("NA", "out", "NA_out"), ("NB", "out")]) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + + interf_addvar = FunctionInterface(fun_addvar, ["out"]) + nb = NewNode(name="NB", interface=interf_addvar, workingdir="nb") + # using the map methods after add (using mapper for the last added nodes as default) + wf.add(na) + wf.map_node(mapper="a", inputs={"a": [3, 5]}) + wf.add(nb) + wf.map_node(mapper=("NA.a", "b"), inputs={"b": [2, 1]}) + wf.connect("NA", "out", "NB", "a") + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + # checking if workflow.results is the same as results of nodes + assert wf.result["NA_out"] == wf.nodes[0].result["out"] + assert wf.result["out"] == wf.nodes[1].result["out"] + + # checking values of workflow.result + expected = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + key_sort = list(expected[0][0].keys()) + expected.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.result["NA_out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + #pdb.set_trace() + assert wf.is_complete + for i, res in enumerate(expected): + assert wf.result["NA_out"][i][0] == res[0] + assert wf.result["NA_out"][i][1] == res[1] + + expected_B = [({"NA.a": 3, "NB.b": 2}, 7), ({"NA.a": 5, "NB.b": 1}, 8)] + key_sort = list(expected_B[0][0].keys()) + expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.result["out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_B): + assert wf.result["out"][i][0] == res[0] + assert wf.result["out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_12a(plugin, change_dir): + """testing if wf.result raises exceptione (the same workflow as in test_workflow_6)""" + wf = NewWorkflow(name="wf12a", workingdir="test_wf12a_{}".format(plugin), + wf_output_names=[("NA", "out", "wf_out"), ("NB", "out", "wf_out")]) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + + interf_addvar = FunctionInterface(fun_addvar, ["out"]) + nb = NewNode(name="NB", interface=interf_addvar, workingdir="nb") + # using the map methods after add (using mapper for the last added nodes as default) + wf.add(na) + wf.map_node(mapper="a", inputs={"a": [3, 5]}) + wf.add(nb) + wf.map_node(mapper=("NA.a", "b"), inputs={"b": [2, 1]}) + wf.connect("NA", "out", "NB", "a") + + sub = Submitter(runnable=wf, plugin=plugin) + # wf_out can't be used twice + with pytest.raises(Exception) as exinfo: + sub.run() + assert str(exinfo.value) == "the key wf_out is already used in workflow.result" + +# tests for a workflow that have its own input and mapper + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_13(plugin, change_dir): + """using inputs for workflow and connect_wf_input""" + wf = NewWorkflow(name="wf13", inputs={"wfa": [3, 5]}, mapper="wfa", workingdir="test_wf13_{}".format(plugin), + wf_output_names=[("NA", "out", "NA_out")]) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + wf.add(na) + wf.connect_wf_input("wfa", "NA", "a") + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + assert wf.is_complete + expected = [({"wf13.wfa": 3}, [({"NA.a": 3}, 5)]), + ({'wf13.wfa': 5}, [({"NA.a": 5}, 7)])] + for i, res in enumerate(expected): + assert wf.result["NA_out"][i][0] == res[0] + assert wf.result["NA_out"][i][1][0][0] == res[1][0][0] + assert wf.result["NA_out"][i][1][0][1] == res[1][0][1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_13a(plugin, change_dir): + """using inputs for workflow and connect_wf_input (the node has 2 inputs)""" + wf = NewWorkflow(name="wf13a", inputs={"wfa": [3, 5]}, mapper="wfa", workingdir="test_wf13a_{}".format(plugin), + wf_output_names=[("NA", "out", "NA_out")]) + interf_addvar = FunctionInterface(fun_addvar, ["out"]) + na = NewNode(name="NA", interface=interf_addvar, workingdir="na", mapper="b", inputs={"b": [10, 20]}) + wf.add(na) + wf.connect_wf_input("wfa", "NA", "a") + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + assert wf.is_complete + expected = [({"wf13a.wfa": 3}, [({"NA.a": 3, "NA.b": 10}, 13), ({"NA.a": 3, "NA.b": 20}, 23)]), + ({'wf13a.wfa': 5}, [({"NA.a": 5, "NA.b": 10}, 15), ({"NA.a": 5, "NA.b": 20}, 25)])] + for i, res in enumerate(expected): + assert wf.result["NA_out"][i][0] == res[0] + for j in range(len(res[1])): + assert wf.result["NA_out"][i][1][j][0] == res[1][j][0] + assert wf.result["NA_out"][i][1][j][1] == res[1][j][1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_13c(plugin, change_dir): + """using inputs for workflow and connect_wf_input, using wf.map(mapper, inputs)""" + wf = NewWorkflow(name="wf13c", workingdir="test_wf13c_{}".format(plugin), + wf_output_names=[("NA", "out", "NA_out")]) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + wf.add(na).map(mapper="wfa", inputs={"wfa": [3, 5]}) + wf.connect_wf_input("wfa", "NA", "a") + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + assert wf.is_complete + expected = [({"wf13c.wfa": 3}, [({"NA.a": 3}, 5)]), + ({'wf13c.wfa': 5}, [({"NA.a": 5}, 7)])] + for i, res in enumerate(expected): + assert wf.result["NA_out"][i][0] == res[0] + assert wf.result["NA_out"][i][1][0][0] == res[1][0][0] + assert wf.result["NA_out"][i][1][0][1] == res[1][0][1] + + @pytest.mark.parametrize("plugin", Plugins) + @python35_only + def test_workflow_13b(plugin, change_dir): + """using inputs for workflow and connect_wf_input, using wf.map(mapper)""" + wf = NewWorkflow(name="wf13b", inputs={"wfa": [3, 5]}, + workingdir="test_wf13b_{}".format(plugin), + wf_output_names=[("NA", "out", "NA_out")]) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + wf.add(na).map(mapper="wfa") + wf.connect_wf_input("wfa", "NA", "a") + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + assert wf.is_complete + expected = [({"wf13b.wfa": 3}, [({"NA.a": 3}, 5)]), + ({'wf13b.wfa': 5}, [({"NA.a": 5}, 7)])] + for i, res in enumerate(expected): + assert wf.result["NA_out"][i][0] == res[0] + assert wf.result["NA_out"][i][1][0][0] == res[1][0][0] + assert wf.result["NA_out"][i][1][0][1] == res[1][0][1] + + +# workflow as a node + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_14(plugin, change_dir): + """workflow with a workflow as a node (no mapper)""" + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na", inputs={"a": 3}) + wfa = NewWorkflow(name="wfa", workingdir="test_wfa", + wf_output_names=[("NA", "out", "NA_out")]) + wfa.add(na) + + wf = NewWorkflow(name="wf14", workingdir="test_wf14_{}".format(plugin), + wf_output_names=[("wfa", "NA_out", "wfa_out")]) + wf.add(wfa) + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + assert wf.is_complete + expected = [({"NA.a": 3}, 5)] + for i, res in enumerate(expected): + assert wf.result["wfa_out"][i][0] == res[0] + assert wf.result["wfa_out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_14a(plugin, change_dir): + """workflow with a workflow as a node (no mapper, using connect_wf_input in wfa)""" + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + wfa = NewWorkflow(name="wfa", workingdir="test_wfa", inputs={"a": 3}, + wf_output_names=[("NA", "out", "NA_out")]) + wfa.add(na) + wfa.connect_wf_input("a", "NA", "a") + + wf = NewWorkflow(name="wf14a", workingdir="test_wf14a_{}".format(plugin), + wf_output_names=[("wfa", "NA_out", "wfa_out")]) + wf.add(wfa) + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + assert wf.is_complete + expected = [({"NA.a": 3}, 5)] + for i, res in enumerate(expected): + assert wf.result["wfa_out"][i][0] == res[0] + assert wf.result["wfa_out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_14b(plugin, change_dir): + """workflow with a workflow as a node (no mapper, using connect_wf_input in wfa and wf)""" + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + wfa = NewWorkflow(name="wfa", workingdir="test_wfa", + wf_output_names=[("NA", "out", "NA_out")]) + wfa.add(na) + wfa.connect_wf_input("a", "NA", "a") + + wf = NewWorkflow(name="wf14b", workingdir="test_wf14b_{}".format(plugin), + wf_output_names=[("wfa", "NA_out", "wfa_out")], inputs={"a": 3}) + wf.add(wfa) + wf.connect_wf_input("a", "wfa", "a") + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + assert wf.is_complete + expected = [({"NA.a": 3}, 5)] + for i, res in enumerate(expected): + assert wf.result["wfa_out"][i][0] == res[0] + assert wf.result["wfa_out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_15(plugin, change_dir): + """workflow with a workflow as a node with mapper (like 14 but with a mapper)""" + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na", + inputs={"a": [3, 5]}, mapper="a") + wfa = NewWorkflow(name="wfa", workingdir="test_wfa", + wf_output_names=[("NA", "out", "NA_out")]) + wfa.add(na) + + wf = NewWorkflow(name="wf15", workingdir="test_wf15_{}".format(plugin), + wf_output_names=[("wfa", "NA_out", "wfa_out")]) + wf.add(wfa) + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + assert wf.is_complete + expected = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + for i, res in enumerate(expected): + assert wf.result["wfa_out"][i][0] == res[0] + assert wf.result["wfa_out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_16(plugin, change_dir): + """workflow with two nodes, and one is a workflow (no mapper)""" + wf = NewWorkflow(name="wf16", workingdir="test_wf16_{}".format(plugin), + wf_output_names=[("wfb", "NB_out"), ("NA", "out", "NA_out")]) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na", inputs={"a": 3}) + wf.add(na) + + # the second node does not have explicit mapper (but keeps the mapper from the NA node) + interf_addvar = FunctionInterface(fun_addvar, ["out"]) + nb = NewNode(name="NB", interface=interf_addvar, workingdir="nb") + wfb = NewWorkflow(name="wfb", workingdir="test_wfb", inputs={"b": 10}, + wf_output_names=[("NB", "out", "NB_out")]) + wfb.add(nb) + wfb.connect_wf_input("b", "NB", "b") + wfb.connect_wf_input("a", "NB", "a") + + wf.add(wfb) + wf.connect("NA", "out", "wfb", "a") + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + assert wf.is_complete + expected_A = [({"NA.a": 3}, 5)] + for i, res in enumerate(expected_A): + assert wf.result["NA_out"][i][0] == res[0] + assert wf.result["NA_out"][i][1] == res[1] + + # TODO: the naming rememebrs only the node, doesnt remember that a came from NA... + # the naming should have names with workflows?? + expected_B = [({"NB.a": 5, "NB.b": 10}, 15)] + for i, res in enumerate(expected_B): + assert wf.result["NB_out"][i][0] == res[0] + assert wf.result["NB_out"][i][1] == res[1] + + +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_workflow_16a(plugin, change_dir): + """workflow with two nodes, and one is a workflow (with mapper)""" + wf = NewWorkflow(name="wf16a", workingdir="test_wf16a_{}".format(plugin), + wf_output_names=[("wfb", "NB_out"), ("NA", "out", "NA_out")]) + interf_addtwo = FunctionInterface(fun_addtwo, ["out"]) + na = NewNode(name="NA", interface=interf_addtwo, workingdir="na") + na.map(mapper="a", inputs={"a": [3, 5]}) + wf.add(na) + + # the second node does not have explicit mapper (but keeps the mapper from the NA node) + interf_addvar = FunctionInterface(fun_addvar, ["out"]) + nb = NewNode(name="NB", interface=interf_addvar, workingdir="nb") + wfb = NewWorkflow(name="wfb", workingdir="test_wfb", inputs={"b": 10}, + wf_output_names=[("NB", "out", "NB_out")]) + wfb.add(nb) + wfb.connect_wf_input("b", "NB", "b") + wfb.connect_wf_input("a", "NB", "a") + + # adding 2 nodes and create a connection (as it is now) + wf.add(wfb) + wf.connect("NA", "out", "wfb", "a") + assert wf.nodes[0].mapper == "NA.a" + + sub = Submitter(runnable=wf, plugin=plugin) + sub.run() + sub.close() + + assert wf.is_complete + + expected_A = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)] + for i, res in enumerate(expected_A): + assert wf.result["NA_out"][i][0] == res[0] + assert wf.result["NA_out"][i][1] == res[1] + + # TODO: the naming rememebrs only the node, doesnt remember that a came from NA... + # the naming should have names with workflows?? + expected_B = [({"NB.a": 5, "NB.b": 10}, 15), ({"NB.a": 7, "NB.b": 10}, 17)] + key_sort = list(expected_B[0][0].keys()) + expected_B.sort(key=lambda t: [t[0][key] for key in key_sort]) + wf.result["NB_out"].sort(key=lambda t: [t[0][key] for key in key_sort]) + for i, res in enumerate(expected_B): + assert wf.result["NB_out"][i][0] == res[0] + assert wf.result["NB_out"][i][1] == res[1] + + +# testing CurrentInterface that is a temporary wrapper for current interfaces + +@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_current_node_1(change_dir, plugin): + """Node with a current interface and inputs, no mapper, running interface""" + interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") + + nn = NewNode(name="NA", inputs={"in_file": "/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz"}, interface=interf_bet, + workingdir="test_cnd1_{}".format(plugin), output_names=["out_file"]) + + sub = Submitter(plugin=plugin, runnable=nn) + sub.run() + sub.close() + # TODO: nodes only returns relative path + assert "out_file" in nn.output.keys() + + +@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_current_node_2(change_dir, plugin): + """Node with a current interface and mapper""" + interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") + + in_file_l = ["/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz", + "/Users/dorota/nipype_workshop/data/ds000114/sub-02/ses-test/anat/sub-02_ses-test_T1w.nii.gz"] + nn = NewNode(name="NA", inputs={"in_file": in_file_l}, mapper="in_file", interface=interf_bet, print_val=False, + workingdir="test_cnd2_{}".format(plugin), output_names=["out_file"]) + + sub = Submitter(plugin=plugin, runnable=nn) + sub.run() + sub.close() + + assert "out_file" in nn.output.keys() + assert "NA.in_file:0" in nn.output["out_file"].keys() + assert "NA.in_file:1" in nn.output["out_file"].keys() + + +@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_current_wf_1(change_dir, plugin): + """Wf with a current interface, no mapper""" + interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") + + nn = NewNode(name="fsl", inputs={"in_file": "/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz"}, interface=interf_bet, + workingdir="nn", output_names=["out_file"], print_val=False) + + wf = NewWorkflow( workingdir="test_cwf_1_{}".format(plugin), name="cw1", wf_output_names=[("fsl", "out_file", "fsl_out")], print_val=False) + wf.add_nodes([nn]) + + sub = Submitter(plugin=plugin, runnable=wf) + sub.run() + sub.close() + + assert "fsl_out" in wf.output.keys() + + +@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_current_wf_1a(change_dir, plugin): + """Wf with a current interface, no mapper""" + interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") + + nn = NewNode(name="fsl", inputs={"in_file": "/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz"}, interface=interf_bet, + workingdir="nn", output_names=["out_file"], print_val=False) + + wf = NewWorkflow(workingdir="test_cwf_1a_{}".format(plugin), name="cw1", wf_output_names=[("fsl", "out_file", "fsl_out")], print_val=False) + wf.add(runnable=nn) + + sub = Submitter(plugin=plugin, runnable=wf) + sub.run() + sub.close() + + assert "fsl_out" in wf.output.keys() + + +@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_current_wf_1b(change_dir, plugin): + """Wf with a current interface, no mapper; using wf.add(nipype CurrentInterface)""" + interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") + + wf = NewWorkflow(workingdir="test_cwf_1b_{}".format(plugin), name="cw1", wf_output_names=[("fsl", "out_file", "fsl_out")], print_val=False) + wf.add(runnable=interf_bet, name="fsl", workingdir="nn", output_names=["out_file"], print_val=False, + inputs={"in_file": "/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz"}) + + sub = Submitter(plugin=plugin, runnable=wf) + sub.run() + sub.close() + + assert "fsl_out" in wf.output.keys() + + +@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_current_wf_1c(change_dir, plugin): + """Wf with a current interface, no mapper; using wf.add(nipype interface) """ + + wf = NewWorkflow(workingdir="test_cwf_1c_{}".format(plugin), name="cw1", wf_output_names=[("fsl", "out_file", "fsl_out")], print_val=False) + wf.add(runnable=fsl.BET(), name="fsl", workingdir="nn", output_names=["out_file"], print_val=False, + inputs={"in_file": "/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz"}) + + sub = Submitter(plugin=plugin, runnable=wf) + sub.run() + sub.close() + + assert "fsl_out" in wf.output.keys() + + +@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_current_wf_2(change_dir, plugin): + """Wf with a current interface and mapper""" + interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") + + in_file_l = ["/Users/dorota/nipype_workshop/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz", + "/Users/dorota/nipype_workshop/data/ds000114/sub-02/ses-test/anat/sub-02_ses-test_T1w.nii.gz"] + + nn = NewNode(name="fsl", interface=interf_bet, print_val=False, + workingdir="nn", output_names=["out_file"]) + + wf = NewWorkflow( workingdir="test_cwf_2_{}".format(plugin), name="cw2", wf_output_names=[("fsl", "out_file", "fsl_out")], + inputs={"in_file": in_file_l}, mapper="in_file", print_val=False) + wf.add_nodes([nn]) + wf.connect_wf_input("in_file", "fsl", "in_file") + + sub = Submitter(plugin=plugin, runnable=wf) + sub.run() + sub.close() + + assert "fsl_out" in wf.output.keys() + assert 'cw2.in_file:0' in wf.output["fsl_out"].keys() + assert 'cw2.in_file:1' in wf.output["fsl_out"].keys() + + +@pytest.mark.skipif(not os.path.exists("/Users/dorota/nipype_workshop/data/ds000114"), reason="adding data") +@pytest.mark.parametrize("plugin", Plugins) +@python35_only +def test_current_wf_2a(change_dir, plugin): + """Wf with a current interface and mapper""" + interf_bet = CurrentInterface(interface=fsl.BET(), name="fsl_interface") + + in_file_l = ["/data/ds000114/sub-01/ses-test/anat/sub-01_ses-test_T1w.nii.gz", + "/data/ds000114/sub-02/ses-test/anat/sub-02_ses-test_T1w.nii.gz"] + + nn = NewNode(name="fsl", interface=interf_bet, print_val=False, + workingdir="nn", output_names=["out_file"], + inputs={"in_file": in_file_l}, mapper="in_file") + + wf = NewWorkflow( workingdir="test_cwf_2a_{}".format(plugin), name="cw2a", wf_output_names=[("fsl", "out_file", "fsl_out")], + print_val=False) + wf.add_nodes([nn]) + # wf.connect_wf_input("in_file", "fsl", "in_file") + + sub = Submitter(plugin=plugin, runnable=wf) + sub.run() + sub.close() + + assert "fsl_out" in wf.output.keys() + assert 'fsl.in_file:0' in wf.output["fsl_out"].keys() + assert 'fsl.in_file:1' in wf.output["fsl_out"].keys() diff --git a/nipype/pipeline/engine/tests/test_newnode_neuro.py b/nipype/pipeline/engine/tests/test_newnode_neuro.py new file mode 100644 index 0000000000..eef8841f4f --- /dev/null +++ b/nipype/pipeline/engine/tests/test_newnode_neuro.py @@ -0,0 +1,162 @@ +import os +import pytest + +from nipype.pipeline.engine import NewNode, NewWorkflow +from ..submitter import Submitter + +#dj niworkflows vs ...?? +from nipype.interfaces.utility import Rename +import nipype.interfaces.freesurfer as fs + +from fmriprep.interfaces.freesurfer import PatchedConcatenateLTA as ConcatenateLTA + +@pytest.fixture() +def change_dir(request): + orig_dir = os.getcwd() + test_dir = os.path.join(orig_dir, "/nipype/nipype/pipeline/engine/test_neuro") + os.makedirs(test_dir, exist_ok=True) + os.chdir(test_dir) + + def move2orig(): + os.chdir(orig_dir) + + request.addfinalizer(move2orig) + +import pdb + +Name = "example" +DEFAULT_MEMORY_MIN_GB = None +# TODO, adding fields to Inputs (subject_id) +Inputs = {"subject_id": "sub-01", + "output_spaces": ["fsaverage", "fsaverage5"], + "source_file": "/fmriprep_test/workdir1/fmriprep_wf/single_subject_01_wf/func_preproc_ses_test_task_fingerfootlips_wf/bold_t1_trans_wf/merge/vol0000_xform-00000_merged.nii", + "t1_preproc": "/fmriprep_test/output1/fmriprep/sub-01/anat/sub-01_T1w_preproc.nii.gz", + "t1_2_fsnative_forward_transform": "/fmriprep_test/workdir1/fmriprep_wf/single_subject_01_wf/anat_preproc_wf/surface_recon_wf/t1_2_fsnative_xfm/out.lta", + "subjects_dir": "/fmriprep_test/output1/freesurfer/" +} + +Plugins = ["serial", "mp", "cf", "dask"] + +def select_target(subject_id, space): + """ Given a source subject ID and a target space, get the target subject ID """ + return subject_id if space == 'fsnative' else space + +@pytest.mark.parametrize("plugin", Plugins) +def test_neuro(change_dir, plugin): + + # wf = Workflow(name, mem_gb_node=DEFAULT_MEMORY_MIN_GB, + # inputs=['source_file', 't1_preproc', 'subject_id', + # 'subjects_dir', 't1_2_fsnative_forward_transform', + # 'mem_gb', 'output_spaces', 'medial_surface_nan'], + # outputs='surfaces') + # + #dj: why do I need outputs? + + + wf = NewWorkflow(name=Name, inputs=Inputs, workingdir="test_neuro_{}".format(plugin), print_val=False, + wf_output_names=[("sampler", "out_file", "sampler_out"), ("targets", "out", "target_out")]) + + # @interface + # def select_target(subject_id, space): + # """ Given a source subject ID and a target space, get the target subject ID """ + # return subject_id if space == 'fsnative' else space + + + + # wf.add('targets', select_target(subject_id=wf.inputs.subject_id)) + # .map('space', space=[space for space in wf.inputs.output_spaces + # if space.startswith('fs')]) + + #dj: don't have option in map to connect with wf input + + wf.add(runnable=select_target, name="targets", subject_id="subject_id", output_names=["out"], + out_read=True, print_val=False)\ + .map_node(mapper="space", inputs={"space": [space for space in Inputs["output_spaces"] + if space.startswith("fs")]}) + + # wf.add('rename_src', Rename(format_string='%(subject)s', + # keep_ext=True, + # in_file=wf.inputs.source_file)) + # .map('subject') + + wf.add(name='rename_src', + runnable=Rename(format_string='%(subject)s', keep_ext=True), + in_file="source_file", subject="subject_id", + output_names=["out_file"], + print_val=False)\ + # .map_node('subject') #TODO: now it's only one subject + + + # wf.add('resampling_xfm', + # fs.utils.LTAConvert(in_lta='identity.nofile', + # out_lta=True, + # source_file=wf.inputs.source_file, + # target_file=wf.inputs.t1_preproc) + # .add('set_xfm_source', ConcatenateLTA(out_type='RAS2RAS', + # in_lta2=wf.inputs.t1_2_fsnative_forward_transform, + # in_lta1=wf.resampling_xfm.out_lta)) + + + wf.add(name='resampling_xfm', + runnable=fs.utils.LTAConvert(in_lta='identity.nofile', out_lta=True), + source_file="source_file", target_file="t1_preproc", + output_names=["out_lta"], + print_val=False)\ + .add(name='set_xfm_source', runnable=ConcatenateLTA(out_type='RAS2RAS'), + in_lta2="t1_2_fsnative_forward_transform", in_lta1="resampling_xfm.out_lta", + output_names=["out_file"], print_val=False) + + + + # wf.add('sampler', + # fs.SampleToSurface(sampling_method='average', sampling_range=(0, 1, 0.2), + # sampling_units='frac', interp_method='trilinear', + # cortex_mask=True, override_reg_subj=True, + # out_type='gii', + # subjects_dir=wf.inputs.subjects_dir, + # subject_id=wf.inputs.subject_id, + # reg_file=wf.set_xfm_source.out_file, + # target_subject=wf.targets.out, + # source_file=wf.rename_src.out_file), + # mem_gb=mem_gb * 3) + # .map([('source_file', 'target_subject'), 'hemi'], hemi=['lh', 'rh']) + + + wf.add(name='sampler', + runnable=fs.SampleToSurface(sampling_method='average', sampling_range=(0, 1, 0.2), + sampling_units='frac', interp_method='trilinear', + cortex_mask=True, override_reg_subj=True, + out_type='gii'), print_val=False, + subjects_dir="subjects_dir", subject_id="subject_id", reg_file="set_xfm_source.out_file", + target_subject="targets.out", source_file="rename_src.out_file", output_names=["out_file"])\ + .map_node(mapper=['_targets', 'hemi'], inputs={"hemi": ['lh', 'rh']}) + + + sub = Submitter(plugin=plugin, runnable=wf) + sub.run() + sub.close() + + assert "target_out" in wf.output.keys() + assert len(list(wf.output["target_out"].keys())) == 2 + + assert "sampler_out" in wf.output.keys() + assert len(list(wf.output["sampler_out"].keys())) == 4 + # dj: no conditions + # dj: no join for now + + # wf.add_cond('cond1', + # condition=wf.inputs.medial_surface_nan, + # iftrue=wf.add('medial_nans', MedialNaNs(subjects_dir=wf.inputs.subjects_dir, + # in_file=wf.sampler.out_file, + # target_subject=wf.targets.out)) + # .set_output('out', wf.median_nans.out), + # elseclause=wf.set_output('out', wf.sampler.out_file)) + # + # wf.add('merger', niu.Merge(1, ravel_inputs=True, + # in1=wf.cond1.out), + # run_without_submitting=True) + # .join('sampler.hemi') + # + # wf.add('update_metadata', + # GiftiSetAnatomicalStructure(in_file=wf.merger.out)) + # wf.outputs.surfaces = wf.update_metadata.out_file \ No newline at end of file diff --git a/nipype/pipeline/engine/workers.py b/nipype/pipeline/engine/workers.py new file mode 100644 index 0000000000..456aed49ff --- /dev/null +++ b/nipype/pipeline/engine/workers.py @@ -0,0 +1,100 @@ +from __future__ import print_function, division, unicode_literals, absolute_import +from builtins import object +from collections import defaultdict + +from future import standard_library +standard_library.install_aliases() + +from copy import deepcopy +import re, os, pdb, time +import multiprocessing as mp +#import multiprocess as mp +import itertools + +#from pycon_utils import make_cluster +from dask.distributed import Client + +import concurrent.futures as cf + +from ... import config, logging +logger = logging.getLogger('nipype.workflow') + + +class Worker(object): + def __init__(self): + logger.debug("Initialize Worker") + pass + + def run_el(self): + raise NotImplementedError + + def close(self): + raise NotImplementedError + + +class MpWorker(Worker): + def __init__(self, nr_proc=4): #should be none + self.nr_proc = nr_proc + self.pool = mp.Pool(processes=self.nr_proc) + logger.debug('Initialize MpWorker') + + def run_el(self, interface, inp): + self.pool.apply_async(interface, (inp[0], inp[1])) + + def close(self): + # added this method since I was having somtetimes problem with reading results from (existing) files + # i thought that pool.close() should work, but still was getting some errors, so testing terminate + self.pool.terminate() + + +class SerialWorker(Worker): + def __init__(self): + logger.debug("Initialize SerialWorker") + pass + + def run_el(self, interface, inp): + interface(inp[0], inp[1]) + + def close(self): + pass + + +class ConcurrentFuturesWorker(Worker): + def __init__(self, nr_proc=4): + self.nr_proc = nr_proc + self.pool = cf.ProcessPoolExecutor(self.nr_proc) + logger.debug('Initialize ConcurrentFuture') + + def run_el(self, interface, inp): + x = self.pool.submit(interface, inp[0], inp[1]) + #print("X, DONE", x.done()) + x.add_done_callback(lambda x: print("DONE ", interface, inp, x.done)) + #print("DIR", x.result()) + + def close(self): + self.pool.shutdown() + + +class DaskWorker(Worker): + def __init__(self): + from distributed.deploy.local import LocalCluster + logger.debug("Initialize Dask Worker") + #self.cluster = LocalCluster() + self.client = Client()#self.cluster) + #print("BOKEH", self.client.scheduler_info()["address"] + ":" + str(self.client.scheduler_info()["services"]["bokeh"])) + + + def run_el(self, interface, inp): + print("DASK, run_el: ", interface, inp, time.time()) + # dask doesn't copy the node second time, so it doesn't see that I change input in the meantime (??) + x = self.client.submit(interface, inp[0], inp[1]) + print("DASK, status: ", x.status) + # this important, otherwise dask will not finish the job + x.add_done_callback(lambda x: print("DONE ", interface, inp)) + print("res", x.result()) + + + def close(self): + #self.cluster.close() + self.client.close() + diff --git a/nipype/pipeline/engine/workflows.py b/nipype/pipeline/engine/workflows.py index c0e253c0e3..41b8414364 100644 --- a/nipype/pipeline/engine/workflows.py +++ b/nipype/pipeline/engine/workflows.py @@ -10,16 +10,17 @@ absolute_import) from builtins import str, bytes, open -import os +import os, glob import os.path as op import sys from datetime import datetime -from copy import deepcopy +from copy import copy, deepcopy import pickle import shutil import numpy as np import networkx as nx +import collections, itertools from ... import config, logging from ...exceptions import NodeError, WorkflowError, MappingError, JoinError @@ -28,13 +29,18 @@ from ...interfaces.base import (traits, TraitedSpec, TraitDictObject, TraitListObject) -from ...utils.filemanip import save_json, makedirs, to_str +from ...utils.filemanip import save_json, makedirs, to_str, loadpkl from .utils import (generate_expanded_graph, export_graph, write_workflow_prov, write_workflow_resources, format_dot, topological_sort, get_print_name, merge_dict, format_node) from .base import EngineBase from .nodes import MapNode, Node +from . import state +from . import auxiliary as aux +from . import submitter as sub + +import pdb # Py2 compat: http://python-future.org/compatible_idioms.html#collections-counter-and-ordereddict from future import standard_library @@ -341,7 +347,7 @@ def remove_nodes(self, nodes): # Input-Output access @property def inputs(self): - return self._get_inputs() + return self.get_inputs() @property def outputs(self): @@ -764,7 +770,7 @@ def _check_outputs(self, parameter): def _check_inputs(self, parameter): return self._has_attr(parameter, subtype='in') - def _get_inputs(self): + def get_inputs(self): """Returns the inputs of a workflow This function does not return any input ports that are already @@ -1046,7 +1052,7 @@ def _get_dot(self, return ('\n' + prefix).join(dotlist) def add(self, name, node_like): - if is_interface(node_like): + if is_function_interface(node_like): node = Node(node_like, name=name) elif is_node(node_like): node = node_like @@ -1065,88 +1071,679 @@ class Join(Node): class MapState(object): pass -class NewNode(EngineBase): - def __init__(self, inputs={}, map_on=None, join_by=None, - *args, **kwargs): - self._mappers = {} - self._joiners = {} - - def map(self, field, values=None): - if isinstance(field, list): - for field_ - if values is not None: - if len(values != len(field)): - elif isinstance(field, tuple): - pass - if values is None: - values = getattr(self._inputs, field) - if values is None: - raise MappingError('Cannot map unassigned input field') - self._mappers[field] = values +# dj ??: should I use EngineBase? +class NewBase(object): + def __init__(self, name, mapper=None, inputs=None, other_mappers=None, mem_gb=None, + cache_location=None, print_val=True, *args, **kwargs): + self.name = name + #dj TODO: I should think what is needed in the __init__ (I redefine some of rhe attributes anyway) + if inputs: + # adding name of the node to the input name + self._inputs = dict(("{}.{}".format(self.name, key), value) for (key, value) in inputs.items()) + self._inputs = dict((key, np.array(val)) if type(val) is list else (key, val) + for (key, val) in self._inputs.items()) + self._state_inputs = self._inputs.copy() + else: + self._inputs = {} + self._state_inputs = {} + if mapper: + # adding name of the node to the input name within the mapper + mapper = aux.change_mapper(mapper, self.name) + self._mapper = mapper + # information about other nodes' mappers from workflow (in case the mapper from previous node is used) + self._other_mappers = other_mappers + # create state (takes care of mapper, connects inputs with axes, so we can ask for specifc element) + self._state = state.State(mapper=self._mapper, node_name=self.name, other_mappers=self._other_mappers) + self._output = {} + self._result = {} + # flag that says if the node/wf is ready to run (has all input) + self.ready2run = True + # needed outputs from other nodes if the node part of a wf + self.needed_outputs = [] + # flag that says if node finished all jobs + self._is_complete = False + # flag that says if value of state input should be printed in output and directories (otherwise indices) + self.print_val = print_val + + # TODO: don't use it yet + self.mem_gb = mem_gb + self.cache_location = cache_location + + + # TBD def join(self, field): pass + @property + def state(self): + return self._state -class NewWorkflow(NewNode): - def __init__(self, inputs={}, *args, **kwargs): - super(NewWorkflow, self).__init__(*args, **kwargs) + @property + def mapper(self): + return self._mapper - self._nodes = {} + @mapper.setter + def mapper(self, mapper): + self._mapper = mapper + # updating state + self._state = state.State(mapper=self._mapper, node_name=self.name, other_mappers=self._other_mappers) - mro = self.__class__.mro() - wf_klasses = mro[:mro.index(NewWorkflow)][::-1] - items = {} - for klass in wf_klasses: - items.update(klass.__dict__) - for name, runnable in items.items(): - if name in ('__module__', '__doc__'): - continue + @property + def state_inputs(self): + return self._state_inputs - self.add(name, value) + @state_inputs.setter + def state_inputs(self, state_inputs): + self._state_inputs.update(state_inputs) - def add(self, name, runnable): - if is_function(runnable): - node = Node(Function(function=runnable), name=name) - elif is_interface(runnable): - node = Node(runnable, name=name) - elif is_node(runnable): - node = runnable if runnable.name == name else runnable.clone(name=name) + + @property + def output(self): + return self._output + + @property + def result(self): + if not self._result: + self._reading_results() + return self._result + + + def prepare_state_input(self): + self._state.prepare_state_input(state_inputs=self.state_inputs) + + + def map(self, mapper, inputs=None): + if self._mapper: + raise Exception("mapper is already set") else: - raise ValueError("Unknown workflow element: {!r}".format(runnable)) - setattr(self, name, node) - self._nodes[name] = node - self._last_added = name - - def map(self, field, node=None, values=None): - if node is None: - if '.' in field: - node, field = field.rsplit('.', 1) + self._mapper = aux.change_mapper(mapper, self.name) + + if inputs: + inputs = dict(("{}.{}".format(self.name, key), value) for (key, value) in inputs.items()) + inputs = dict((key, np.array(val)) if type(val) is list else (key, val) + for (key, val) in inputs.items()) + self._inputs.update(inputs) + self._state_inputs.update(inputs) + if mapper: + # updating state if we have a new mapper + self._state = state.State(mapper=self._mapper, node_name=self.name, other_mappers=self._other_mappers) + + + def join(self, field, node=None): + # TBD + pass + + + def checking_input_el(self, ind): + """checking if all inputs are available (for specific state element)""" + try: + self.get_input_el(ind) + return True + except: #TODO specify + return False + + + # dj: this is not used for a single node + def get_input_el(self, ind): + """collecting all inputs required to run the node (for specific state element)""" + state_dict = self.state.state_values(ind) + inputs_dict = {k: state_dict[k] for k in self._inputs.keys()} + if not self.print_val: + state_dict = self.state.state_ind(ind) + # reading extra inputs that come from previous nodes + for (from_node, from_socket, to_socket) in self.needed_outputs: + dir_nm_el_from = "_".join(["{}:{}".format(i, j) for i, j in list(state_dict.items()) + if i in list(from_node._state_inputs.keys())]) + if not from_node.mapper: + dir_nm_el_from = "" + + if is_node(from_node) and is_current_interface(from_node.interface): + file_from = self._reading_ci_output(node=from_node, dir_nm_el=dir_nm_el_from, out_nm=from_socket) + if file_from and os.path.exists(file_from): + inputs_dict["{}.{}".format(self.name, to_socket)] = file_from + else: + raise Exception("{} doesnt exist".format(file_from)) + else: # assuming here that I want to read the file (will not be used with the current interfaces) + file_from = os.path.join(from_node.workingdir, dir_nm_el_from, from_socket+".txt") + with open(file_from) as f: + content = f.readline() + try: + inputs_dict["{}.{}".format(self.name, to_socket)] = eval(content) + except NameError: + inputs_dict["{}.{}".format(self.name, to_socket)] = content + + return state_dict, inputs_dict + + def _reading_ci_output(self, dir_nm_el, out_nm, node=None): + """used for current interfaces: checking if the output exists and returns the path if it does""" + if not node: + node = self + result_pklfile = os.path.join(os.getcwd(), node.workingdir, dir_nm_el, + node.interface.nn.name, "result_{}.pklz".format(node.interface.nn.name)) + if os.path.exists(result_pklfile): + out_file = getattr(loadpkl(result_pklfile).outputs, out_nm) + if os.path.exists(out_file): + return out_file else: - node = self._last_added + return False + else: + return False - if '.' in node: - subwf, node = node.split('.', 1) - self._nodes[subwf].map(field, node, values) - return - if node in self._mappers: + # checking if all outputs are saved + @property + def is_complete(self): + # once _is_complete os True, this should not change + logger.debug('is_complete {}'.format(self._is_complete)) + if self._is_complete: + return self._is_complete + else: + return self._check_all_results() + + + def get_output(self): + raise NotImplementedError + + def _check_all_results(self): + raise NotImplementedError + + def _reading_results(self): + raise NotImplementedError + + + def _dict_tuple2list(self, container): + if type(container) is dict: + val_l = [val for (_, val) in container.items()] + elif type(container) is tuple: + val_l = [container] + else: + raise Exception("{} has to be dict or tuple".format(container)) + return val_l + + +class NewNode(NewBase): + def __init__(self, name, interface, inputs=None, mapper=None, join_by=None, + workingdir=None, other_mappers=None, mem_gb=None, cache_location=None, + output_names=None, print_val=True, *args, **kwargs): + super(NewNode, self).__init__(name=name, mapper=mapper, inputs=inputs, + other_mappers=other_mappers, mem_gb=mem_gb, + cache_location=cache_location, print_val=print_val, + *args, **kwargs) + + # working directory for node, will be change if node is a part of a wf + self.workingdir = workingdir + self.interface = interface + + if is_function_interface(self.interface): + # adding node name to the interface's name mapping + self.interface.input_map = dict((key, "{}.{}".format(self.name, value)) + for (key, value) in self.interface.input_map.items()) + # list of output names taken from interface output name + self.output_names = self.interface._output_nm + elif is_current_interface(self.interface): + # list of interf_key_out + self.output_names = output_names + if not self.output_names: + self.output_names = [] + + + + # dj: not sure if I need it + # def __deepcopy__(self, memo): # memo is a dict of id's to copies + # id_self = id(self) # memoization avoids unnecesary recursion + # _copy = memo.get(id_self) + # if _copy is None: + # # changing names of inputs and input_map, so it doesnt contain node.name + # inputs_copy = dict((key[len(self.name)+1:], deepcopy(value)) + # for (key, value) in self.inputs.items()) + # interface_copy = deepcopy(self.interface) + # interface_copy.input_map = dict((key, val[len(self.name)+1:]) + # for (key, val) in interface_copy.input_map.items()) + # _copy = type(self)( + # name=deepcopy(self.name), interface=interface_copy, + # inputs=inputs_copy, mapper=deepcopy(self.mapper), + # base_dir=deepcopy(self.nodedir), other_mappers=deepcopy(self._other_mappers)) + # memo[id_self] = _copy + # return _copy + + + @property + def inputs(self): + return self._inputs + + @inputs.setter + def inputs(self, inputs): + self._inputs.update(inputs) + + + def run_interface_el(self, i, ind): + """ running interface one element generated from node_state.""" + logger.debug("Run interface el, name={}, i={}, ind={}".format(self.name, i, ind)) + state_dict, inputs_dict = self.get_input_el(ind) + if not self.print_val: + state_dict = self.state.state_ind(ind) + dir_nm_el = "_".join(["{}:{}".format(i, j) for i, j in list(state_dict.items())]) + print("Run interface el, dict={}".format(state_dict)) + logger.debug("Run interface el, name={}, inputs_dict={}, state_dict={}".format( + self.name, inputs_dict, state_dict)) + if is_function_interface(self.interface): + res = self.interface.run(inputs_dict) + output = self.interface.output + print("Run fun interface el, output={}".format(output)) + logger.debug("Run fun interface el, output={}".format(output)) + self._writting_results_tmp(state_dict, dir_nm_el, output) + elif is_current_interface(self.interface): + if not self.mapper: + dir_nm_el = "" + res = self.interface.run(inputs=inputs_dict, base_dir=os.path.join(os.getcwd(), self.workingdir), + dir_nm_el=dir_nm_el) + + # TODO when join + #if self._joinByKey: + # dir_join = "join_" + "_".join(["{}.{}".format(i, j) for i, j in list(state_dict.items()) if i not in self._joinByKey]) + #elif self._join: + # dir_join = "join_" + #if self._joinByKey or self._join: + # os.makedirs(os.path.join(self.nodedir, dir_join), exist_ok=True) + # dir_nm_el = os.path.join(dir_join, dir_nm_el) + return res + + + def _writting_results_tmp(self, state_dict, dir_nm_el, output): + """temporary method to write the results in the files (this is usually part of a interface)""" + if not self.mapper: + dir_nm_el = '' + os.makedirs(os.path.join(self.workingdir, dir_nm_el), exist_ok=True) + for key_out, val_out in output.items(): + with open(os.path.join(self.workingdir, dir_nm_el, key_out+".txt"), "w") as fout: + fout.write(str(val_out)) + + + def get_output(self): + """collecting all outputs and updating self._output""" + for key_out in self.output_names: + self._output[key_out] = {} + for (i, ind) in enumerate(itertools.product(*self.state.all_elements)): + if self.print_val: + state_dict = self.state.state_values(ind) + else: + state_dict = self.state.state_ind(ind) + dir_nm_el = "_".join(["{}:{}".format(i, j) for i, j in list(state_dict.items())]) + if self.mapper: + if is_function_interface(self.interface): + output = os.path.join(self.workingdir, dir_nm_el, key_out + ".txt") + if self.interface.out_read: + with open(output) as fout: + content = fout.readline() + try: + output = eval(content) + except NameError: + output = content + self._output[key_out][dir_nm_el] = (state_dict, output) + elif is_current_interface(self.interface): + self._output[key_out][dir_nm_el] = \ + (state_dict, (state_dict, self._reading_ci_output(dir_nm_el=dir_nm_el, out_nm=key_out))) + else: + if is_function_interface(self.interface): + output = os.path.join(self.workingdir, key_out + ".txt") + if self.interface.out_read: + with open(output) as fout: + try: + output = eval(fout.readline()) + except NewWorkflow: + output = fout.readline() + self._output[key_out] = (state_dict, output) + elif is_current_interface(self.interface): + self._output[key_out] = \ + (state_dict, self._reading_ci_output(dir_nm_el="", out_nm=key_out)) + return self._output + + + # dj: version without join + def _check_all_results(self): + """checking if all files that should be created are present""" + for ind in itertools.product(*self.state.all_elements): + if self.print_val: + state_dict = self.state.state_values(ind) + else: + state_dict = self.state.state_ind(ind) + dir_nm_el = "_".join(["{}:{}".format(i, j) for i, j in list(state_dict.items())]) + if not self.mapper: + dir_nm_el = "" + + for key_out in self.output_names: + if is_function_interface(self.interface): + if not os.path.isfile(os.path.join(self.workingdir, dir_nm_el, key_out+".txt")): + return False + elif is_current_interface(self.interface): + if not self._reading_ci_output(dir_nm_el, key_out): + return False + self._is_complete = True + return True + + + def _reading_results(self): + """temporary: reading results from output files (that is now just txt) + should be probably just reading output for self.output_names + """ + for key_out in self.output_names: + self._result[key_out] = [] + #pdb.set_trace() + if self._state_inputs: + val_l = self._dict_tuple2list(self._output[key_out]) + for (st_dict, filename) in val_l: + with open(filename) as fout: + self._result[key_out].append((st_dict, eval(fout.readline()))) + else: + # st_dict should be {} + # not sure if this is used (not tested) + (st_dict, filename) = self._output[key_out][None] + with open(filename) as fout: + self._result[key_out].append(({}, eval(fout.readline()))) + + # dj: removing temp. from NewNode class + # def run(self, plugin="serial"): + # """preparing the node to run and run the interface""" + # self.prepare_state_input() + # submitter = sub.SubmitterNode(plugin, node=self) + # submitter.run_node() + # submitter.close() + # self.collecting_output() + + +class NewWorkflow(NewBase): + def __init__(self, name, inputs=None, wf_output_names=None, mapper=None, #join_by=None, + nodes=None, workingdir=None, mem_gb=None, cache_location=None, print_val=True, *args, **kwargs): + super(NewWorkflow, self).__init__(name=name, mapper=mapper, inputs=inputs, mem_gb=mem_gb, + cache_location=cache_location, print_val=print_val, *args, **kwargs) + + self.graph = nx.DiGraph() + # all nodes in the workflow (probably will be removed) + self._nodes = [] + # saving all connection between nodes + self.connected_var = {} + # input that are expected by nodes to get from wf.inputs + self.needed_inp_wf = [] + if nodes: + self.add_nodes(nodes) + for nn in self._nodes: + self.connected_var[nn] = {} + # key: name of a node, value: the node + self._node_names = {} + # key: name of a node, value: mapper of the node + self._node_mappers = {} + # dj: not sure if this should be different than base_dir + self.workingdir = os.path.join(os.getcwd(), workingdir) + # list of (nodename, output name in the name, output name in wf) or (nodename, output name in the name) + # dj: using different name than for node, since this one it is defined by a user + self.wf_output_names = wf_output_names + + # nodes that are created when the workflow has mapper (key: node name, value: list of nodes) + self.inner_nodes = {} + # in case of inner workflow this points to the main/parent workflow + self.parent_wf = None + # dj not sure what was the motivation, wf_klasses gives an empty list + #mro = self.__class__.mro() + #wf_klasses = mro[:mro.index(NewWorkflow)][::-1] + #items = {} + #for klass in wf_klasses: + # items.update(klass.__dict__) + #for name, runnable in items.items(): + # if name in ('__module__', '__doc__'): + # continue + + # self.add(name, value) + + @property + def inputs(self): + return self._inputs + + @inputs.setter + def inputs(self, inputs): + self._inputs.update(dict(("{}.{}".format(self.name, key), value) for (key, value) in inputs.items())) + + + @property + def nodes(self): + return self._nodes + + @property + def graph_sorted(self): + # TODO: should I always update the graph? + return list(nx.topological_sort(self.graph)) + + + def map_node(self, mapper, node=None, inputs=None): + """this is setting a mapper to the wf's nodes (not to the wf)""" + if not node: + node = self._last_added + if node.mapper: raise WorkflowError("Cannot assign two mappings to the same input") + node.map(mapper=mapper, inputs=inputs) + self._node_mappers[node.name] = node.mapper - self._mappers[node] = (field, values) - def join(self, field, node=None): - pass + def get_output(self): + # not sure, if I should collecto output of all nodes or only the ones that are used in wf.output + self.node_outputs = {} + for nn in self.graph: + if self.mapper: + self.node_outputs[nn.name] = [ni.get_output() for ni in self.inner_nodes[nn.name]] + else: + self.node_outputs[nn.name] = nn.get_output() + if self.wf_output_names: + for out in self.wf_output_names: + if len(out) == 2: + node_nm, out_nd_nm, out_wf_nm = out[0], out[1], out[1] + elif len(out) == 3: + node_nm, out_nd_nm, out_wf_nm = out + else: + raise Exception("wf_output_names should have 2 or 3 elements") + if out_wf_nm not in self._output.keys(): + if self.mapper: + self._output[out_wf_nm] = {} + for (i, ind) in enumerate(itertools.product(*self.state.all_elements)): + if self.print_val: + wf_inputs_dict = self.state.state_values(ind) + dir_nm_el = "_".join(["{}:{}".format(i, j) for i, j in list(wf_inputs_dict.items())]) + else: + wf_ind_dict = self.state.state_ind(ind) + dir_nm_el = "_".join(["{}:{}".format(i, j) for i, j in list(wf_ind_dict.items())]) + self._output[out_wf_nm][dir_nm_el] = self.node_outputs[node_nm][i][out_nd_nm] + else: + self._output[out_wf_nm] = self.node_outputs[node_nm][out_nd_nm] + else: + raise Exception("the key {} is already used in workflow.result".format(out_wf_nm)) + return self._output + + + # dj: version without join + # TODO: might merge with the function from Node + def _check_all_results(self): + """checking if all files that should be created are present""" + for nn in self.graph_sorted: + if nn.name in self.inner_nodes.keys(): + if not all([ni.is_complete for ni in self.inner_nodes[nn.name]]): + return False + else: + if not nn.is_complete: + return False + self._is_complete = True + return True + + + # TODO: should try to merge with the function from Node + def _reading_results(self): + """reading all results of the workflow + using temporary Node._reading_results that reads txt files + """ + if self.wf_output_names: + for out in self.wf_output_names: + key_out = out[2] if len(out)==3 else out[1] + self._result[key_out] = [] + if self.mapper: + for (i, ind) in enumerate(itertools.product(*self.state.all_elements)): + if self.print_val: + wf_inputs_dict = self.state.state_values(ind) + else: + wf_inputs_dict = self.state.state_ind(ind) + dir_nm_el = "_".join(["{}:{}".format(i, j) for i, j in list(wf_inputs_dict.items())]) + res_l= [] + val_l = self._dict_tuple2list(self.output[key_out][dir_nm_el]) + for val in val_l: + with open(val[1]) as fout: + logger.debug('Reading Results: file={}, st_dict={}'.format(val[1], val[0])) + res_l.append((val[0], eval(fout.readline()))) + self._result[key_out].append((wf_inputs_dict, res_l)) + else: + val_l = self._dict_tuple2list(self.output[key_out]) + for val in val_l: + #TODO: I think that val shouldn't be dict here... + # TMP solution + if type(val) is dict: + val = [v for k,v in val.items()][0] + with open(val[1]) as fout: + logger.debug('Reading Results: file={}, st_dict={}'.format(val[1], val[0])) + self._result[key_out].append((val[0], eval(fout.readline()))) + + + def add_nodes(self, nodes): + """adding nodes without defining connections""" + self.graph.add_nodes_from(nodes) + for nn in nodes: + self._nodes.append(nn) + #self._inputs.update(nn.inputs) + self.connected_var[nn] = {} + self._node_names[nn.name] = nn + self._node_mappers[nn.name] = nn.mapper + + + def add(self, runnable, name=None, workingdir=None, inputs=None, output_names=None, mapper=None, + mem_gb=None, print_val=True, out_read=False, **kwargs): + if is_function(runnable): + if not output_names: + output_names = ["out"] + interface = aux.FunctionInterface(function=runnable, output_nm=output_names, out_read=out_read) + if not name: + raise Exception("you have to specify name for the node") + if not workingdir: + workingdir = name + node = NewNode(interface=interface, workingdir=workingdir, name=name, inputs=inputs, mapper=mapper, + other_mappers=self._node_mappers, mem_gb=mem_gb, print_val=print_val) + elif is_function_interface(runnable) or is_current_interface(runnable): + if not name: + raise Exception("you have to specify name for the node") + if not workingdir: + workingdir = name + node = NewNode(interface=runnable, workingdir=workingdir, name=name, inputs=inputs, mapper=mapper, + other_mappers=self._node_mappers, mem_gb_node=mem_gb, output_names=output_names, + print_val=print_val) + elif is_nipype_interface(runnable): + ci = aux.CurrentInterface(interface=runnable, name=name) + if not name: + raise Exception("you have to specify name for the node") + if not workingdir: + workingdir = name + node = NewNode(interface=ci, workingdir=workingdir, name=name, inputs=inputs, mapper=mapper, + other_mappers=self._node_mappers, mem_gb_node=mem_gb, output_names=output_names, + print_val=print_val) + elif is_node(runnable): + node = runnable + elif is_workflow(runnable): + node = runnable + else: + raise ValueError("Unknown workflow element: {!r}".format(runnable)) + self.add_nodes([node]) + self._last_added = node + + # connecting inputs to other nodes outputs + for (inp, source) in kwargs.items(): + try: + from_node_nm, from_socket = source.split(".") + self.connect(from_node_nm, from_socket, node.name, inp) + # TODO not sure if i need it, just check if from_node_nm is not None?? + except(ValueError): + self.connect_wf_input(source, node.name, inp) + return self + + + def connect(self, from_node_nm, from_socket, to_node_nm, to_socket): + from_node = self._node_names[from_node_nm] + to_node = self._node_names[to_node_nm] + self.graph.add_edges_from([(from_node, to_node)]) + if not to_node in self.nodes: + self.add_nodes(to_node) + self.connected_var[to_node][to_socket] = (from_node, from_socket) + # from_node.sending_output.append((from_socket, to_node, to_socket)) + logger.debug('connecting {} and {}'.format(from_node, to_node)) + + + def connect_wf_input(self, inp_wf, node_nm, inp_nd): + self.needed_inp_wf.append((node_nm, inp_wf, inp_nd)) + + + def preparing(self, wf_inputs=None, wf_inputs_ind=None): + """preparing nodes which are connected: setting the final mapper and state_inputs""" + #pdb.set_trace() + for node_nm, inp_wf, inp_nd in self.needed_inp_wf: + node = self._node_names[node_nm] + if "{}.{}".format(self.name, inp_wf) in wf_inputs: + node.state_inputs.update({"{}.{}".format(node_nm, inp_nd): wf_inputs["{}.{}".format(self.name, inp_wf)]}) + node.inputs.update({"{}.{}".format(node_nm, inp_nd): wf_inputs["{}.{}".format(self.name, inp_wf)]}) + else: + raise Exception("{}.{} not in the workflow inputs".format(self.name, inp_wf)) + for nn in self.graph_sorted: + if self.print_val: + dir_nm_el = "_".join(["{}:{}".format(i, j) for i, j in list(wf_inputs.items())]) + else: + dir_nm_el = "_".join(["{}:{}".format(i, j) for i, j in list(wf_inputs_ind.items())]) + if not self.mapper: + dir_nm_el = "" + nn.workingdir = os.path.join(self.workingdir, dir_nm_el, nn.name) + nn._is_complete = False # helps when mp is used + try: + for inp, (out_node, out_var) in self.connected_var[nn].items(): + nn.ready2run = False #it has some history (doesnt have to be in the loop) + nn.state_inputs.update(out_node.state_inputs) + nn.needed_outputs.append((out_node, out_var, inp)) + #if there is no mapper provided, i'm assuming that mapper is taken from the previous node + if (not nn.mapper or nn.mapper == out_node.mapper) and out_node.mapper: + nn.mapper = out_node.mapper + else: + pass + #TODO: implement inner mapper + except(KeyError): + # tmp: we don't care about nn that are not in self.connected_var + pass + + nn.prepare_state_input() + + # removing temp. from NewWorkflow + # def run(self, plugin="serial"): + # #self.preparing(wf_inputs=self.inputs) # moved to submitter + # self.prepare_state_input() + # logger.debug('the sorted graph is: {}'.format(self.graph_sorted)) + # submitter = sub.SubmitterWorkflow(workflow=self, plugin=plugin) + # submitter.run_workflow() + # submitter.close() + # self.collecting_output() def is_function(obj): return hasattr(obj, '__call__') +def is_function_interface(obj): + return type(obj) is aux.FunctionInterface -def is_interface(obj): - return all(hasattr(obj, protocol) - for protocol in ('input_spec', 'output_spec', 'run')) +def is_current_interface(obj): + return type(obj) is aux.CurrentInterface +def is_nipype_interface(obj): + return hasattr(obj, "_run_interface") def is_node(obj): - return hasattr(obj, itername) + return type(obj) is NewNode + +def is_workflow(obj): + return type(obj) is NewWorkflow diff --git a/requirements.txt b/requirements.txt index 5ef00ec98b..6daeee21ab 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,5 @@ mock pydotplus pydot>=1.2.3 packaging +dask +distributed diff --git a/rtd_requirements.txt b/rtd_requirements.txt index 68a366bbdf..6a84e7c2aa 100644 --- a/rtd_requirements.txt +++ b/rtd_requirements.txt @@ -17,3 +17,4 @@ psutil matplotlib packaging numpydoc +dask