Skip to content

Commit 7fb54ef

Browse files
committed
Add type signatures to case_utils.case_file.create_file_node
This uses an alternative light-class definition style more focused on type signatures. The example origin is cited inline. References: * [AC-211] Add static type checking to CASE-Utilities-Python Signed-off-by: Alex Nelson <alexander.nelson@nist.gov>
1 parent 8854561 commit 7fb54ef

File tree

1 file changed

+38
-17
lines changed

1 file changed

+38
-17
lines changed

case_utils/case_file/__init__.py

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,24 @@
3636
NS_UCO_VOCABULARY = rdflib.Namespace("https://unifiedcyberontology.org/ontology/uco/vocabulary#")
3737
NS_XSD = rdflib.XSD
3838

39-
def create_file_node(graph, filepath, node_iri=None, node_prefix=DEFAULT_PREFIX, disable_hashes=False, disable_mtime=False):
39+
# Shortcut syntax for defining an immutable named tuple is noted here:
40+
# https://docs.python.org/3/library/typing.html#typing.NamedTuple
41+
# via the "See also" box here: https://docs.python.org/3/library/collections.html#collections.namedtuple
42+
class HashDict(typing.NamedTuple):
43+
filesize : int
44+
md5 : str
45+
sha1 : str
46+
sha256 : str
47+
sha512 : str
48+
49+
def create_file_node(
50+
graph : rdflib.Graph,
51+
filepath : str,
52+
node_iri : typing.Optional[str] = None,
53+
node_prefix : str = DEFAULT_PREFIX,
54+
disable_hashes : bool = False,
55+
disable_mtime : bool = False
56+
) -> rdflib.URIRef:
4057
r"""
4158
This function characterizes the file at filepath.
4259
@@ -121,20 +138,20 @@ def create_file_node(graph, filepath, node_iri=None, node_prefix=DEFAULT_PREFIX,
121138
))
122139

123140
# Compute hashes until they are re-computed and match once. (This is a lesson learned from working with a NAS that had a subtly faulty network cable.)
124-
successful_hashdict = None
125-
last_hashdict = dict()
141+
142+
successful_hashdict : typing.Optional[HashDict] = None
143+
last_hashdict : typing.Optional[HashDict] = None
126144
for attempt_no in [0, 1, 2, 3]:
127-
current_hashdict = dict()
128145
# Hash file's contents.
129146
# This hashing logic was partially copied from DFXML's walk_to_dfxml.py.
130147
md5obj = hashlib.md5()
131148
sha1obj = hashlib.sha1()
132149
sha256obj = hashlib.sha256()
133150
sha512obj = hashlib.sha512()
134151
stashed_error = None
152+
byte_tally = 0
135153
with open(filepath, "rb") as in_fh:
136154
chunk_size = 2**22
137-
byte_tally = 0
138155
while True:
139156
buf = b""
140157
try:
@@ -149,13 +166,15 @@ def create_file_node(graph, filepath, node_iri=None, node_prefix=DEFAULT_PREFIX,
149166
sha1obj.update(buf)
150167
sha256obj.update(buf)
151168
sha512obj.update(buf)
152-
current_hashdict["filesize"] = byte_tally
153169
if not stashed_error is None:
154170
raise stashed_error
155-
current_hashdict["md5"] = md5obj.hexdigest()
156-
current_hashdict["sha1"] = sha1obj.hexdigest()
157-
current_hashdict["sha256"] = sha256obj.hexdigest()
158-
current_hashdict["sha512"] = sha512obj.hexdigest()
171+
current_hashdict = HashDict(
172+
byte_tally,
173+
md5obj.hexdigest(),
174+
sha1obj.hexdigest(),
175+
sha256obj.hexdigest(),
176+
sha512obj.hexdigest()
177+
)
159178
if last_hashdict == current_hashdict:
160179
successful_hashdict = current_hashdict
161180
break
@@ -165,22 +184,23 @@ def create_file_node(graph, filepath, node_iri=None, node_prefix=DEFAULT_PREFIX,
165184
del current_hashdict
166185
if successful_hashdict is None:
167186
raise ValueError("Failed to confirm hashes of file %r." % filepath)
168-
if successful_hashdict["filesize"] != file_stat.st_size:
187+
if successful_hashdict.filesize != file_stat.st_size:
169188
# TODO - Discuss with AC whether this should be something stronger, like an assertion error.
170189
warnings.warn(
171-
"Inode file size and hashed file sizes disagree: %d vs. %d.",
172-
file_stat.st_size,
173-
successful_hashdict["filesize"]
190+
"Inode file size and hashed file sizes disagree: %d vs. %d." % (
191+
file_stat.st_size,
192+
successful_hashdict.filesize
193+
)
174194
)
175195
# TODO - Discuss whether this property should be recorded even if hashes are not attempted.
176196
graph.add((
177197
n_contentdata_facet,
178198
NS_UCO_OBSERVABLE.sizeInBytes,
179-
rdflib.Literal(successful_hashdict["filesize"])
199+
rdflib.Literal(successful_hashdict.filesize)
180200
))
181201

182202
# Add confirmed hashes into graph.
183-
for key in successful_hashdict:
203+
for key in successful_hashdict._fields:
184204
if not key in ("md5", "sha1", "sha256", "sha512"):
185205
continue
186206
n_hash = rdflib.BNode()
@@ -199,10 +219,11 @@ def create_file_node(graph, filepath, node_iri=None, node_prefix=DEFAULT_PREFIX,
199219
NS_UCO_TYPES.hashMethod,
200220
rdflib.Literal(key.upper(), datatype=NS_UCO_VOCABULARY.HashNameVocab)
201221
))
222+
hash_value = getattr(successful_hashdict, key)
202223
graph.add((
203224
n_hash,
204225
NS_UCO_TYPES.hashValue,
205-
rdflib.Literal(successful_hashdict[key].upper(), datatype=NS_XSD.hexBinary)
226+
rdflib.Literal(hash_value.upper(), datatype=NS_XSD.hexBinary)
206227
))
207228

208229
return n_file

0 commit comments

Comments
 (0)