Skip to content

Commit 65cdd56

Browse files
committed
Merge branch 'master' into enh/bosh
2 parents 7a7b1b3 + 020181d commit 65cdd56

File tree

7 files changed

+333
-60
lines changed

7 files changed

+333
-60
lines changed

pydra/engine/core.py

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -532,29 +532,37 @@ def done(self):
532532
return True
533533
return False
534534

535-
def _combined_output(self):
535+
def _combined_output(self, return_inputs=False):
536536
combined_results = []
537537
for (gr, ind_l) in self.state.final_combined_ind_mapping.items():
538-
combined_results.append([])
538+
combined_results_gr = []
539539
for ind in ind_l:
540540
result = load_result(self.checksum_states(ind), self.cache_locations)
541541
if result is None:
542542
return None
543-
combined_results[gr].append(result)
543+
if return_inputs is True or return_inputs == "val":
544+
result = (self.state.states_val[ind], result)
545+
elif return_inputs == "ind":
546+
result = (self.state.states_ind[ind], result)
547+
combined_results_gr.append(result)
548+
combined_results.append(combined_results_gr)
544549
if len(combined_results) == 1 and self.state.splitter_rpn_final == []:
545550
# in case it's full combiner, removing the nested structure
546551
return combined_results[0]
547552
else:
548553
return combined_results
549554

550-
def result(self, state_index=None):
555+
def result(self, state_index=None, return_inputs=False):
551556
"""
552557
Retrieve the outcomes of this particular task.
553558
554559
Parameters
555560
----------
556-
state_index :
557-
TODO
561+
state_index : :obj: `int`
562+
index of the element for task with splitter and multiple states
563+
return_inputs : :obj: `bool`, :obj:`str`
564+
if True or "val" result is returned together with values of the input fields,
565+
if "ind" result is returned together with indices of the input fields
558566
559567
Returns
560568
-------
@@ -567,28 +575,50 @@ def result(self, state_index=None):
567575
if state_index is None:
568576
# if state_index=None, collecting all results
569577
if self.state.combiner:
570-
return self._combined_output()
578+
return self._combined_output(return_inputs=return_inputs)
571579
else:
572580
results = []
573581
for checksum in self.checksum_states():
574582
result = load_result(checksum, self.cache_locations)
575583
if result is None:
576584
return None
577585
results.append(result)
578-
return results
586+
if return_inputs is True or return_inputs == "val":
587+
return list(zip(self.state.states_val, results))
588+
elif return_inputs == "ind":
589+
return list(zip(self.state.states_ind, results))
590+
else:
591+
return results
579592
else: # state_index is not None
580593
if self.state.combiner:
581-
return self._combined_output()[state_index]
594+
return self._combined_output(return_inputs=return_inputs)[
595+
state_index
596+
]
582597
result = load_result(
583598
self.checksum_states(state_index), self.cache_locations
584599
)
585-
return result
600+
if return_inputs is True or return_inputs == "val":
601+
return (self.state.states_val[state_index], result)
602+
elif return_inputs == "ind":
603+
return (self.state.states_ind[state_index], result)
604+
else:
605+
return result
586606
else:
587607
if state_index is not None:
588608
raise ValueError("Task does not have a state")
589609
checksum = self.checksum
590610
result = load_result(checksum, self.cache_locations)
591-
return result
611+
if return_inputs is True or return_inputs == "val":
612+
inputs_val = {
613+
f"{self.name}.{inp}": getattr(self.inputs, inp)
614+
for inp in self.input_names
615+
}
616+
return (inputs_val, result)
617+
elif return_inputs == "ind":
618+
inputs_ind = {f"{self.name}.{inp}": None for inp in self.input_names}
619+
return (inputs_ind, result)
620+
else:
621+
return result
592622

593623
def _reset(self):
594624
"""Reset the connections between inputs and LazyFields."""

pydra/engine/helpers.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,3 +513,31 @@ def output_from_inputfields(output_spec, inputs):
513513
(field_name, attr.ib(type=File, metadata={"value": value}))
514514
)
515515
return output_spec
516+
517+
518+
def get_available_cpus():
519+
"""
520+
Return the number of CPUs available to the current process or, if that is not
521+
available, the total number of CPUs on the system.
522+
523+
Returns
524+
-------
525+
n_proc : :obj:`int`
526+
The number of available CPUs.
527+
"""
528+
# Will not work on some systems or if psutil is not installed.
529+
# See https://psutil.readthedocs.io/en/latest/#psutil.Process.cpu_affinity
530+
try:
531+
import psutil
532+
533+
return len(psutil.Process().cpu_affinity())
534+
except (AttributeError, ImportError, NotImplementedError):
535+
pass
536+
537+
# Not available on all systems, including macOS.
538+
# See https://docs.python.org/3/library/os.html#os.sched_getaffinity
539+
if hasattr(os, "sched_getaffinity"):
540+
return len(os.sched_getaffinity(0))
541+
542+
# Last resort
543+
return os.cpu_count()

pydra/engine/helpers_file.py

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -87,27 +87,66 @@ def hash_file(afile, chunk_len=8192, crypto=sha256, raise_notfound=True):
8787
return crypto_obj.hexdigest()
8888

8989

90-
def hash_dir(dirpath, raise_notfound=True):
90+
def hash_dir(
91+
dirpath,
92+
crypto=sha256,
93+
ignore_hidden_files=False,
94+
ignore_hidden_dirs=False,
95+
raise_notfound=True,
96+
):
97+
"""Compute hash of directory contents.
98+
99+
This function computes the hash of every file in directory `dirpath` and then
100+
computes the hash of that list of hashes to return a single hash value. The
101+
directory is traversed recursively.
102+
103+
Parameters
104+
----------
105+
dirpath : :obj:`str`
106+
Path to directory.
107+
crypto : :obj: `function`
108+
cryptographic hash functions
109+
ignore_hidden_files : :obj:`bool`
110+
If `True`, ignore filenames that begin with `.`.
111+
ignore_hidden_dirs : :obj:`bool`
112+
If `True`, ignore files in directories that begin with `.`.
113+
raise_notfound : :obj:`bool`
114+
If `True` and `dirpath` does not exist, raise `FileNotFound` exception. If
115+
`False` and `dirpath` does not exist, return `None`.
116+
117+
Returns
118+
-------
119+
hash : :obj:`str`
120+
Hash of the directory contents.
121+
"""
91122
from .specs import LazyField
92123

93124
if dirpath is None or isinstance(dirpath, LazyField) or isinstance(dirpath, list):
94125
return None
95126
if not Path(dirpath).is_dir():
96127
if raise_notfound:
97-
raise RuntimeError(f"Directory {dirpath} not found.")
128+
raise FileNotFoundError(f"Directory {dirpath} not found.")
98129
return None
99130

100-
def search_dir(path):
101-
path = Path(path)
102-
file_list = []
103-
for el in path.iterdir():
104-
if el.is_file():
105-
file_list.append(hash_file(el))
106-
else:
107-
file_list.append(search_dir(path / el))
108-
return file_list
131+
file_hashes = []
132+
for dpath, dirnames, filenames in os.walk(dirpath):
133+
# Sort in-place to guarantee order.
134+
dirnames.sort()
135+
filenames.sort()
136+
dpath = Path(dpath)
137+
if ignore_hidden_dirs and dpath.name.startswith(".") and str(dpath) != dirpath:
138+
continue
139+
for filename in filenames:
140+
if ignore_hidden_files and filename.startswith("."):
141+
continue
142+
this_hash = hash_file(dpath / filename)
143+
file_hashes.append(this_hash)
109144

110-
return search_dir(dirpath)
145+
crypto_obj = crypto()
146+
for h in file_hashes:
147+
crypto_obj.update(h.encode())
148+
149+
return crypto_obj.hexdigest()
111150

112151

113152
def _parse_mount_table(exit_code, output):

pydra/engine/tests/test_helpers.py

Lines changed: 71 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,20 @@
1+
import os
2+
import hashlib
13
from pathlib import Path
4+
import random
5+
import platform
26

37
import pytest
48
import cloudpickle as cp
59

610
from .utils import multiply
7-
from ..helpers import hash_value, hash_function, save, create_pyscript
11+
from ..helpers import (
12+
hash_value,
13+
hash_function,
14+
get_available_cpus,
15+
save,
16+
create_pyscript,
17+
)
818
from .. import helpers_file
919
from ..specs import File, Directory
1020

@@ -135,23 +145,70 @@ def test_hash_value_dir(tmpdir):
135145
with open(file_2, "w") as f:
136146
f.write("hi")
137147

138-
assert hash_value(tmpdir, tp=Directory) == hash_value([file_1, file_2], tp=File)
139-
assert hash_value(tmpdir, tp=Directory) == helpers_file.hash_dir(tmpdir)
148+
test_sha = hashlib.sha256()
149+
for fx in [file_1, file_2]:
150+
test_sha.update(helpers_file.hash_file(fx).encode())
151+
152+
bad_sha = hashlib.sha256()
153+
for fx in [file_2, file_1]:
154+
bad_sha.update(helpers_file.hash_file(fx).encode())
155+
156+
orig_hash = helpers_file.hash_dir(tmpdir)
157+
158+
assert orig_hash == test_sha.hexdigest()
159+
assert orig_hash != bad_sha.hexdigest()
160+
assert orig_hash == hash_value(tmpdir, tp=Directory)
140161

141162

142163
def test_hash_value_nested(tmpdir):
164+
hidden = tmpdir.mkdir(".hidden")
143165
nested = tmpdir.mkdir("nested")
144166
file_1 = tmpdir.join("file_1.txt")
145-
file_2 = nested.join("file_2.txt")
146-
file_3 = nested.join("file_3.txt")
147-
with open(file_1, "w") as f:
148-
f.write("hello")
149-
with open(file_2, "w") as f:
150-
f.write("hi")
151-
with open(file_3, "w") as f:
152-
f.write("hola")
167+
file_2 = hidden.join("file_2.txt")
168+
file_3 = nested.join(".file_3.txt")
169+
file_4 = nested.join("file_4.txt")
170+
171+
test_sha = hashlib.sha256()
172+
for fx in [file_1, file_2, file_3, file_4]:
173+
with open(fx, "w") as f:
174+
f.write(str(random.randint(0, 1000)))
175+
test_sha.update(helpers_file.hash_file(fx).encode())
176+
177+
orig_hash = helpers_file.hash_dir(tmpdir)
153178

154-
assert hash_value(tmpdir, tp=Directory) == hash_value(
155-
[file_1, [file_2, file_3]], tp=File
179+
assert orig_hash == test_sha.hexdigest()
180+
assert orig_hash == hash_value(tmpdir, tp=Directory)
181+
182+
nohidden_hash = helpers_file.hash_dir(
183+
tmpdir, ignore_hidden_dirs=True, ignore_hidden_files=True
156184
)
157-
assert hash_value(tmpdir, tp=Directory) == helpers_file.hash_dir(tmpdir)
185+
nohiddendirs_hash = helpers_file.hash_dir(tmpdir, ignore_hidden_dirs=True)
186+
nohiddenfiles_hash = helpers_file.hash_dir(tmpdir, ignore_hidden_files=True)
187+
188+
assert orig_hash != nohidden_hash
189+
assert orig_hash != nohiddendirs_hash
190+
assert orig_hash != nohiddenfiles_hash
191+
192+
file_3.remove()
193+
assert helpers_file.hash_dir(tmpdir) == nohiddenfiles_hash
194+
hidden.remove()
195+
assert helpers_file.hash_dir(tmpdir) == nohidden_hash
196+
197+
198+
def test_get_available_cpus():
199+
assert get_available_cpus() > 0
200+
try:
201+
import psutil
202+
203+
has_psutil = True
204+
except ImportError:
205+
has_psutil = False
206+
207+
if hasattr(os, "sched_getaffinity"):
208+
assert get_available_cpus() == len(os.sched_getaffinity(0))
209+
210+
if has_psutil and platform.system().lower() != "darwin":
211+
assert get_available_cpus() == len(psutil.Process().cpu_affinity())
212+
213+
if platform.system().lower() == "darwin":
214+
assert get_available_cpus() == os.cpu_count()

0 commit comments

Comments
 (0)