|
10 | 10 | import os
|
11 | 11 | import sys
|
12 | 12 | from copy import deepcopy
|
13 |
| -from shutil import rmtree |
14 | 13 | import pytest
|
15 | 14 |
|
16 | 15 | from ... import engine as pe
|
17 | 16 | from ....interfaces import base as nib
|
18 | 17 | from ....interfaces import utility as niu
|
19 | 18 | from .... import config
|
20 |
| -from ..utils import merge_dict, clean_working_directory, write_workflow_prov |
| 19 | +from ..utils import clean_working_directory, write_workflow_prov |
| 20 | + |
| 21 | + |
| 22 | +class InputSpec(nib.TraitedSpec): |
| 23 | + in_file = nib.File(exists=True, copyfile=True) |
| 24 | + |
| 25 | + |
| 26 | +class OutputSpec(nib.TraitedSpec): |
| 27 | + output1 = nib.traits.List(nib.traits.Int, desc='outputs') |
| 28 | + |
| 29 | + |
| 30 | +class UtilsTestInterface(nib.BaseInterface): |
| 31 | + input_spec = InputSpec |
| 32 | + output_spec = OutputSpec |
| 33 | + |
| 34 | + def _run_interface(self, runtime): |
| 35 | + runtime.returncode = 0 |
| 36 | + return runtime |
| 37 | + |
| 38 | + def _list_outputs(self): |
| 39 | + outputs = self._outputs().get() |
| 40 | + outputs['output1'] = [1] |
| 41 | + return outputs |
21 | 42 |
|
22 | 43 |
|
23 | 44 | def test_identitynode_removal(tmpdir):
|
@@ -99,204 +120,11 @@ class InputSpec(nib.TraitedSpec):
|
99 | 120 | config.set_default_config()
|
100 | 121 |
|
101 | 122 |
|
102 |
| -def test_outputs_removal(tmpdir): |
103 |
| - def test_function(arg1): |
104 |
| - import os |
105 |
| - file1 = os.path.join(os.getcwd(), 'file1.txt') |
106 |
| - file2 = os.path.join(os.getcwd(), 'file2.txt') |
107 |
| - fp = open(file1, 'wt') |
108 |
| - fp.write('%d' % arg1) |
109 |
| - fp.close() |
110 |
| - fp = open(file2, 'wt') |
111 |
| - fp.write('%d' % arg1) |
112 |
| - fp.close() |
113 |
| - return file1, file2 |
114 |
| - |
115 |
| - n1 = pe.Node( |
116 |
| - niu.Function( |
117 |
| - input_names=['arg1'], |
118 |
| - output_names=['file1', 'file2'], |
119 |
| - function=test_function), |
120 |
| - base_dir=tmpdir.strpath, |
121 |
| - name='testoutputs') |
122 |
| - n1.inputs.arg1 = 1 |
123 |
| - n1.config = {'execution': {'remove_unnecessary_outputs': True}} |
124 |
| - n1.config = merge_dict(deepcopy(config._sections), n1.config) |
125 |
| - n1.run() |
126 |
| - assert tmpdir.join(n1.name, 'file1.txt').check() |
127 |
| - assert tmpdir.join(n1.name, 'file1.txt').check() |
128 |
| - n1.needed_outputs = ['file2'] |
129 |
| - n1.run() |
130 |
| - assert not tmpdir.join(n1.name, 'file1.txt').check() |
131 |
| - assert tmpdir.join(n1.name, 'file2.txt').check() |
132 |
| - |
133 |
| - |
134 |
| -class InputSpec(nib.TraitedSpec): |
135 |
| - in_file = nib.File(exists=True, copyfile=True) |
136 |
| - |
137 |
| - |
138 |
| -class OutputSpec(nib.TraitedSpec): |
139 |
| - output1 = nib.traits.List(nib.traits.Int, desc='outputs') |
140 |
| - |
141 |
| - |
142 |
| -class UtilsTestInterface(nib.BaseInterface): |
143 |
| - input_spec = InputSpec |
144 |
| - output_spec = OutputSpec |
145 |
| - |
146 |
| - def _run_interface(self, runtime): |
147 |
| - runtime.returncode = 0 |
148 |
| - return runtime |
149 |
| - |
150 |
| - def _list_outputs(self): |
151 |
| - outputs = self._outputs().get() |
152 |
| - outputs['output1'] = [1] |
153 |
| - return outputs |
154 |
| - |
155 |
| - |
156 |
| -def test_inputs_removal(tmpdir): |
157 |
| - file1 = tmpdir.join('file1.txt') |
158 |
| - file1.write('dummy_file') |
159 |
| - n1 = pe.Node( |
160 |
| - UtilsTestInterface(), base_dir=tmpdir.strpath, name='testinputs') |
161 |
| - n1.inputs.in_file = file1.strpath |
162 |
| - n1.config = {'execution': {'keep_inputs': True}} |
163 |
| - n1.config = merge_dict(deepcopy(config._sections), n1.config) |
164 |
| - n1.run() |
165 |
| - assert tmpdir.join(n1.name, 'file1.txt').check() |
166 |
| - n1.inputs.in_file = file1.strpath |
167 |
| - n1.config = {'execution': {'keep_inputs': False}} |
168 |
| - n1.config = merge_dict(deepcopy(config._sections), n1.config) |
169 |
| - n1.overwrite = True |
170 |
| - n1.run() |
171 |
| - assert not tmpdir.join(n1.name, 'file1.txt').check() |
172 |
| - |
173 |
| - |
174 |
| -def test_outputs_removal_wf(tmpdir): |
175 |
| - def test_function(arg1): |
176 |
| - import os |
177 |
| - file1 = os.path.join(os.getcwd(), 'file1.txt') |
178 |
| - file2 = os.path.join(os.getcwd(), 'file2.txt') |
179 |
| - file3 = os.path.join(os.getcwd(), 'file3.txt') |
180 |
| - file4 = os.path.join(os.getcwd(), 'subdir', 'file1.txt') |
181 |
| - files = [file1, file2, file3, file4] |
182 |
| - os.mkdir("subdir") |
183 |
| - for filename in files: |
184 |
| - with open(filename, 'wt') as fp: |
185 |
| - fp.write('%d' % arg1) |
186 |
| - return file1, file2, os.path.join(os.getcwd(), "subdir") |
187 |
| - |
188 |
| - def test_function2(in_file, arg): |
189 |
| - import os |
190 |
| - in_arg = open(in_file).read() |
191 |
| - file1 = os.path.join(os.getcwd(), 'file1.txt') |
192 |
| - file2 = os.path.join(os.getcwd(), 'file2.txt') |
193 |
| - file3 = os.path.join(os.getcwd(), 'file3.txt') |
194 |
| - files = [file1, file2, file3] |
195 |
| - for filename in files: |
196 |
| - with open(filename, 'wt') as fp: |
197 |
| - fp.write('%d' % arg + in_arg) |
198 |
| - return file1, file2, 1 |
199 |
| - |
200 |
| - def test_function3(arg): |
201 |
| - import os |
202 |
| - return arg |
203 |
| - |
204 |
| - for plugin in ('Linear', ): # , 'MultiProc'): |
205 |
| - n1 = pe.Node( |
206 |
| - niu.Function( |
207 |
| - input_names=['arg1'], |
208 |
| - output_names=['out_file1', 'out_file2', 'dir'], |
209 |
| - function=test_function), |
210 |
| - name='n1', |
211 |
| - base_dir=tmpdir.strpath) |
212 |
| - n1.inputs.arg1 = 1 |
213 |
| - |
214 |
| - n2 = pe.Node( |
215 |
| - niu.Function( |
216 |
| - input_names=['in_file', 'arg'], |
217 |
| - output_names=['out_file1', 'out_file2', 'n'], |
218 |
| - function=test_function2), |
219 |
| - name='n2', |
220 |
| - base_dir=tmpdir.strpath) |
221 |
| - n2.inputs.arg = 2 |
222 |
| - |
223 |
| - n3 = pe.Node( |
224 |
| - niu.Function( |
225 |
| - input_names=['arg'], |
226 |
| - output_names=['n'], |
227 |
| - function=test_function3), |
228 |
| - name='n3', |
229 |
| - base_dir=tmpdir.strpath) |
230 |
| - |
231 |
| - wf = pe.Workflow( |
232 |
| - name="node_rem_test" + plugin, base_dir=tmpdir.strpath) |
233 |
| - wf.connect(n1, "out_file1", n2, "in_file") |
234 |
| - |
235 |
| - wf.run(plugin='Linear') |
236 |
| - |
237 |
| - for remove_unnecessary_outputs in [True, False]: |
238 |
| - config.set_default_config() |
239 |
| - wf.config = { |
240 |
| - 'execution': { |
241 |
| - 'remove_unnecessary_outputs': remove_unnecessary_outputs |
242 |
| - } |
243 |
| - } |
244 |
| - rmtree(os.path.join(wf.base_dir, wf.name)) |
245 |
| - wf.run(plugin=plugin) |
246 |
| - |
247 |
| - assert os.path.exists( |
248 |
| - os.path.join(wf.base_dir, wf.name, n1.name, |
249 |
| - 'file2.txt')) != remove_unnecessary_outputs |
250 |
| - assert os.path.exists( |
251 |
| - os.path.join(wf.base_dir, wf.name, n1.name, "subdir", |
252 |
| - 'file1.txt')) != remove_unnecessary_outputs |
253 |
| - assert os.path.exists( |
254 |
| - os.path.join(wf.base_dir, wf.name, n1.name, 'file1.txt')) |
255 |
| - assert os.path.exists( |
256 |
| - os.path.join(wf.base_dir, wf.name, n1.name, |
257 |
| - 'file3.txt')) != remove_unnecessary_outputs |
258 |
| - assert os.path.exists( |
259 |
| - os.path.join(wf.base_dir, wf.name, n2.name, 'file1.txt')) |
260 |
| - assert os.path.exists( |
261 |
| - os.path.join(wf.base_dir, wf.name, n2.name, 'file2.txt')) |
262 |
| - assert os.path.exists( |
263 |
| - os.path.join(wf.base_dir, wf.name, n2.name, |
264 |
| - 'file3.txt')) != remove_unnecessary_outputs |
265 |
| - |
266 |
| - n4 = pe.Node(UtilsTestInterface(), name='n4', base_dir=tmpdir.strpath) |
267 |
| - wf.connect(n2, "out_file1", n4, "in_file") |
268 |
| - |
269 |
| - def pick_first(l): |
270 |
| - return l[0] |
271 |
| - |
272 |
| - wf.connect(n4, ("output1", pick_first), n3, "arg") |
273 |
| - for remove_unnecessary_outputs in [True, False]: |
274 |
| - for keep_inputs in [True, False]: |
275 |
| - config.set_default_config() |
276 |
| - wf.config = { |
277 |
| - 'execution': { |
278 |
| - 'keep_inputs': keep_inputs, |
279 |
| - 'remove_unnecessary_outputs': |
280 |
| - remove_unnecessary_outputs |
281 |
| - } |
282 |
| - } |
283 |
| - rmtree(os.path.join(wf.base_dir, wf.name)) |
284 |
| - wf.run(plugin=plugin) |
285 |
| - assert os.path.exists( |
286 |
| - os.path.join(wf.base_dir, wf.name, n2.name, 'file1.txt')) |
287 |
| - assert os.path.exists( |
288 |
| - os.path.join(wf.base_dir, wf.name, n2.name, |
289 |
| - 'file2.txt')) != remove_unnecessary_outputs |
290 |
| - assert os.path.exists( |
291 |
| - os.path.join(wf.base_dir, wf.name, n4.name, |
292 |
| - 'file1.txt')) == keep_inputs |
293 |
| - |
294 |
| - |
295 |
| -def fwhm(fwhm): |
296 |
| - return fwhm |
297 |
| - |
298 |
| - |
299 | 123 | def create_wf(name):
|
| 124 | + """Creates a workflow for the following tests""" |
| 125 | + def fwhm(fwhm): |
| 126 | + return fwhm |
| 127 | + |
300 | 128 | pipe = pe.Workflow(name=name)
|
301 | 129 | process = pe.Node(
|
302 | 130 | niu.Function(
|
|
0 commit comments