Skip to content

adding verbose as an optional argument to Task.result #241

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 41 additions & 11 deletions pydra/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,29 +532,37 @@ def done(self):
return True
return False

def _combined_output(self):
def _combined_output(self, return_inputs=False):
combined_results = []
for (gr, ind_l) in self.state.final_combined_ind_mapping.items():
combined_results.append([])
combined_results_gr = []
for ind in ind_l:
result = load_result(self.checksum_states(ind), self.cache_locations)
if result is None:
return None
combined_results[gr].append(result)
if return_inputs is True or return_inputs == "val":
result = (self.state.states_val[ind], result)
elif return_inputs == "ind":
result = (self.state.states_ind[ind], result)
combined_results_gr.append(result)
combined_results.append(combined_results_gr)
if len(combined_results) == 1 and self.state.splitter_rpn_final == []:
# in case it's full combiner, removing the nested structure
return combined_results[0]
else:
return combined_results

def result(self, state_index=None):
def result(self, state_index=None, return_inputs=False):
"""
Retrieve the outcomes of this particular task.

Parameters
----------
state_index :
TODO
state_index : :obj: `int`
index of the element for task with splitter and multiple states
return_inputs : :obj: `bool`, :obj:`str`
if True or "val" result is returned together with values of the input fields,
if "ind" result is returned together with indices of the input fields

Returns
-------
Expand All @@ -567,28 +575,50 @@ def result(self, state_index=None):
if state_index is None:
# if state_index=None, collecting all results
if self.state.combiner:
return self._combined_output()
return self._combined_output(return_inputs=return_inputs)
else:
results = []
for checksum in self.checksum_states():
result = load_result(checksum, self.cache_locations)
if result is None:
return None
results.append(result)
return results
if return_inputs is True or return_inputs == "val":
return list(zip(self.state.states_val, results))
elif return_inputs == "ind":
return list(zip(self.state.states_ind, results))
else:
return results
else: # state_index is not None
if self.state.combiner:
return self._combined_output()[state_index]
return self._combined_output(return_inputs=return_inputs)[
state_index
]
result = load_result(
self.checksum_states(state_index), self.cache_locations
)
return result
if return_inputs is True or return_inputs == "val":
return (self.state.states_val[state_index], result)
elif return_inputs == "ind":
return (self.state.states_ind[state_index], result)
else:
return result
else:
if state_index is not None:
raise ValueError("Task does not have a state")
checksum = self.checksum
result = load_result(checksum, self.cache_locations)
return result
if return_inputs is True or return_inputs == "val":
inputs_val = {
f"{self.name}.{inp}": getattr(self.inputs, inp)
for inp in self.input_names
}
return (inputs_val, result)
elif return_inputs == "ind":
inputs_ind = {f"{self.name}.{inp}": None for inp in self.input_names}
return (inputs_ind, result)
else:
return result

def _reset(self):
"""Reset the connections between inputs and LazyFields."""
Expand Down
130 changes: 113 additions & 17 deletions pydra/engine/tests/test_node_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,18 @@ def test_task_nostate_1(plugin):
# checking the results
results = nn.result()
assert results.output.out == 5
# checking the return_inputs option, either is return_inputs is True, or "val",
# it should give values of inputs that corresponds to the specific element
results_verb = nn.result(return_inputs=True)
results_verb_val = nn.result(return_inputs="val")
assert results_verb[0] == results_verb_val[0] == {"NA.a": 3}
assert results_verb[1].output.out == results_verb_val[1].output.out == 5
# checking the return_inputs option return_inputs="ind"
# it should give indices of inputs (instead of values) for each element
results_verb_ind = nn.result(return_inputs="ind")
assert results_verb_ind[0] == {"NA.a": None}
assert results_verb_ind[1].output.out == 5

# checking the output_dir
assert nn.output_dir.exists()

Expand Down Expand Up @@ -709,6 +721,22 @@ def test_task_state_1(plugin):
expected = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)]
for i, res in enumerate(expected):
assert results[i].output.out == res[1]

# checking the return_inputs option, either return_inputs is True or "val",
# it should give values of inputs that corresponds to the specific element
results_verb = nn.result(return_inputs=True)
results_verb_val = nn.result(return_inputs="val")
for i, res in enumerate(expected):
assert (results_verb[i][0], results_verb[i][1].output.out) == res
assert (results_verb_val[i][0], results_verb_val[i][1].output.out) == res

# checking the return_inputs option return_inputs="ind"
# it should give indices of inputs (instead of values) for each element
results_verb_ind = nn.result(return_inputs="ind")
expected_ind = [({"NA.a": 0}, 5), ({"NA.a": 1}, 7)]
for i, res in enumerate(expected_ind):
assert (results_verb_ind[i][0], results_verb_ind[i][1].output.out) == res

# checking the output_dir
assert nn.output_dir
for odir in nn.output_dir:
Expand Down Expand Up @@ -737,13 +765,14 @@ def test_task_state_1a(plugin):


@pytest.mark.parametrize(
"splitter, state_splitter, state_rpn, expected",
"splitter, state_splitter, state_rpn, expected, expected_ind",
[
(
("a", "b"),
("NA.a", "NA.b"),
["NA.a", "NA.b", "."],
[({"NA.a": 3, "NA.b": 10}, 13), ({"NA.a": 5, "NA.b": 20}, 25)],
[({"NA.a": 0, "NA.b": 0}, 13), ({"NA.a": 1, "NA.b": 1}, 25)],
),
(
["a", "b"],
Expand All @@ -755,11 +784,19 @@ def test_task_state_1a(plugin):
({"NA.a": 5, "NA.b": 10}, 15),
({"NA.a": 5, "NA.b": 20}, 25),
],
[
({"NA.a": 0, "NA.b": 0}, 13),
({"NA.a": 0, "NA.b": 1}, 23),
({"NA.a": 1, "NA.b": 0}, 15),
({"NA.a": 1, "NA.b": 1}, 25),
],
),
],
)
@pytest.mark.parametrize("plugin", Plugins)
def test_task_state_2(plugin, splitter, state_splitter, state_rpn, expected):
def test_task_state_2(
plugin, splitter, state_splitter, state_rpn, expected, expected_ind
):
""" Tasks with two inputs and a splitter (no combiner)"""
nn = fun_addvar(name="NA").split(splitter=splitter, a=[3, 5], b=[10, 20])

Expand All @@ -777,6 +814,21 @@ def test_task_state_2(plugin, splitter, state_splitter, state_rpn, expected):
results = nn.result()
for i, res in enumerate(expected):
assert results[i].output.out == res[1]

# checking the return_inputs option, either return_inputs is True or "val",
# it should give values of inputs that corresponds to the specific element
results_verb = nn.result(return_inputs=True)
results_verb_val = nn.result(return_inputs="val")
for i, res in enumerate(expected):
assert (results_verb[i][0], results_verb[i][1].output.out) == res
assert (results_verb_val[i][0], results_verb_val[i][1].output.out) == res

# checking the return_inputs option return_inputs="ind"
# it should give indices of inputs (instead of values) for each element
results_verb_ind = nn.result(return_inputs="ind")
for i, res in enumerate(expected_ind):
assert (results_verb_ind[i][0], results_verb_ind[i][1].output.out) == res

# checking the output_dir
assert nn.output_dir
for odir in nn.output_dir:
Expand Down Expand Up @@ -982,11 +1034,25 @@ def test_task_state_comb_1(plugin):

# checking the results
results = nn.result()

# fully combined (no nested list)
combined_results = [res.output.out for res in results]

assert combined_results == [5, 7]

expected = [({"NA.a": 3}, 5), ({"NA.a": 5}, 7)]
expected_ind = [({"NA.a": 0}, 5), ({"NA.a": 1}, 7)]
# checking the return_inputs option, either return_inputs is True or "val",
# it should give values of inputs that corresponds to the specific element
results_verb = nn.result(return_inputs=True)
results_verb_val = nn.result(return_inputs="val")
for i, res in enumerate(expected):
assert (results_verb[i][0], results_verb[i][1].output.out) == res
assert (results_verb_val[i][0], results_verb_val[i][1].output.out) == res
# checking the return_inputs option return_inputs="ind"
# it should give indices of inputs (instead of values) for each element
results_verb_ind = nn.result(return_inputs="ind")
for i, res in enumerate(expected_ind):
assert (results_verb_ind[i][0], results_verb_ind[i][1].output.out) == res

# checking the output_dir
assert nn.output_dir
for odir in nn.output_dir:
Expand All @@ -995,7 +1061,7 @@ def test_task_state_comb_1(plugin):

@pytest.mark.parametrize(
"splitter, combiner, state_splitter, state_rpn, state_combiner, state_combiner_all, "
"state_splitter_final, state_rpn_final, expected",
"state_splitter_final, state_rpn_final, expected, expected_val",
[
(
("a", "b"),
Expand All @@ -1006,7 +1072,8 @@ def test_task_state_comb_1(plugin):
["NA.a", "NA.b"],
None,
[],
[({}, [13, 25])],
[13, 25],
[({"NA.a": 3, "NA.b": 10}, 13), ({"NA.a": 5, "NA.b": 20}, 25)],
),
(
("a", "b"),
Expand All @@ -1017,7 +1084,8 @@ def test_task_state_comb_1(plugin):
["NA.a", "NA.b"],
None,
[],
[({}, [13, 25])],
[13, 25],
[({"NA.a": 3, "NA.b": 10}, 13), ({"NA.a": 5, "NA.b": 20}, 25)],
),
(
["a", "b"],
Expand All @@ -1028,7 +1096,11 @@ def test_task_state_comb_1(plugin):
["NA.a"],
"NA.b",
["NA.b"],
[({"NA.b": 10}, [13, 15]), ({"NA.b": 20}, [23, 25])],
[[13, 15], [23, 25]],
[
[({"NA.a": 3, "NA.b": 10}, 13), ({"NA.a": 5, "NA.b": 10}, 15)],
[({"NA.a": 3, "NA.b": 20}, 23), ({"NA.a": 5, "NA.b": 20}, 25)],
],
),
(
["a", "b"],
Expand All @@ -1039,7 +1111,11 @@ def test_task_state_comb_1(plugin):
["NA.b"],
"NA.a",
["NA.a"],
[({"NA.a": 3}, [13, 23]), ({"NA.a": 5}, [15, 25])],
[[13, 23], [15, 25]],
[
[({"NA.a": 3, "NA.b": 10}, 13), ({"NA.a": 3, "NA.b": 20}, 23)],
[({"NA.a": 5, "NA.b": 10}, 15), ({"NA.a": 5, "NA.b": 20}, 25)],
],
),
(
["a", "b"],
Expand All @@ -1050,7 +1126,13 @@ def test_task_state_comb_1(plugin):
["NA.a", "NA.b"],
None,
[],
[({}, [13, 23, 15, 25])],
[13, 23, 15, 25],
[
({"NA.a": 3, "NA.b": 10}, 13),
({"NA.a": 3, "NA.b": 20}, 23),
({"NA.a": 5, "NA.b": 10}, 15),
({"NA.a": 5, "NA.b": 20}, 25),
],
),
],
)
Expand All @@ -1066,6 +1148,7 @@ def test_task_state_comb_2(
state_splitter_final,
state_rpn_final,
expected,
expected_val,
):
""" Tasks with scalar and outer splitters and partial or full combiners"""
nn = (
Expand All @@ -1080,19 +1163,32 @@ def test_task_state_comb_2(
assert nn.state.splitter_rpn == state_rpn
assert nn.state.combiner == state_combiner

with Submitter(plugin=plugin) as sub:
sub(nn)

assert nn.state.splitter_final == state_splitter_final
assert nn.state.splitter_rpn_final == state_rpn_final
assert set(nn.state.right_combiner_all) == set(state_combiner_all)

with Submitter(plugin=plugin) as sub:
sub(nn)

# checking the results
results = nn.result()
# checking the return_inputs option, either return_inputs is True or "val",
# it should give values of inputs that corresponds to the specific element
results_verb = nn.result(return_inputs=True)

if nn.state.splitter_rpn_final:
for i, res in enumerate(expected):
assert [res.output.out for res in results[i]] == res
# results_verb
for i, res_l in enumerate(expected_val):
for j, res in enumerate(res_l):
assert (results_verb[i][j][0], results_verb[i][j][1].output.out) == res
# if the combiner is full expected is "a flat list"
else:
assert [res.output.out for res in results] == expected
for i, res in enumerate(expected_val):
assert (results_verb[i][0], results_verb[i][1].output.out) == res

combined_results = [[res.output.out for res in res_l] for res_l in results]
for i, res in enumerate(expected):
assert combined_results[i] == res[1]
# checking the output_dir
assert nn.output_dir
for odir in nn.output_dir:
Expand Down Expand Up @@ -1130,7 +1226,7 @@ def test_task_state_comb_singl_1(plugin):


@pytest.mark.parametrize("plugin", Plugins)
def test_task_state_comb_2(plugin):
def test_task_state_comb_3(plugin):
""" task with the simplest splitter, the input is an empty list"""
nn = fun_addtwo(name="NA").split(splitter="a", a=[]).combine(combiner=["a"])

Expand Down
Loading