diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b92454b..a2b4da1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,6 +2,7 @@ name: Continuous Integration on: push: + pull_request: jobs: build: @@ -42,10 +43,6 @@ jobs: case-path: ./ case-version: "case-1.3.0" extension-filter: "jsonld" - report-in-pr: "true" - github-token: ${{ secrets.GITHUB_TOKEN }} - repository: ${{ github.repository }} - pull-request: ${{ github.event.pull_request.number }} # Always build the package as a sanity check to ensure no issues with the build system # exist as part of the CI process diff --git a/.gitignore b/.gitignore index 69f1c53..ad89e2d 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,13 @@ .venv/ venv/ +# Wing IDE +*wpr +*wpu + +__pycache__/ +*.pyc + # Build Artifacts build/ dist/ diff --git a/case.jsonld b/case.jsonld index d37a918..f87e17a 100644 --- a/case.jsonld +++ b/case.jsonld @@ -295,8 +295,8 @@ "uco-core:hasFacet": [ { "@id": "0f0b71e7-39a8-5249-83a5-b52888d5c8b7", - "@type": "uco-observable:PhoneAccountFacet", - "uco-observable:phoneNumber": "123456" + "@type": "uco-observable:URLFacet", + "uco-observable:fullValue": "www.docker.com/howto" } ] }, @@ -306,17 +306,116 @@ "uco-core:hasFacet": [ { "@id": "d5797860-8d73-5a53-a6fc-660a4e8c64e2", + "@type": "uco-observable:ApplicationFacet", + "uco-core:name": "Safari" + } + ] + }, + { + "@id": "997f05df-f309-588d-829f-c20e3cbadb57", + "@type": "uco-observable:ObservableObject", + "uco-core:hasFacet": [ + { + "@id": "697d25c6-00bd-571e-aebd-5a96e34ea4b5", + "@type": "uco-observable:URLHistoryFacet", + "uco-observable:browserInformation": { + "@id": "9af4324d-cb7b-54d7-aba4-95714396435e" + }, + "uco-observable:urlHistoryEntry": [ + { + "@id": "641fc5c4-49a3-573e-bdb2-d6d8dc000fee", + "@type": "uco-observable:URLHistoryEntry", + "uco-observable:browserUserProfile": "Jill", + "uco-observable:expirationTime": { + "@type": "xsd:dateTime", + "@value": "2024-12-27T14:55:01+00:00" + }, + "uco-observable:firstVisit": { + "@type": "xsd:dateTime", + "@value": "2024-01-02T15:55:01+00:00" + }, + "uco-observable:hostname": "case_test", + "uco-observable:keywordSearchTerm": "docker", + "uco-observable:lastVisit": { + "@type": "xsd:dateTime", + "@value": "2024-02-10T10:55:01+00:00" + }, + "uco-observable:manuallyEnteredCount": { + "@type": "xsd:nonNegativeInteger", + "@value": "10" + }, + "uco-observable:pageTitle": "Docker tutorial", + "uco-observable:url": { + "@id": "41bc2540-fdd8-5753-864c-00a5de7d0d2e" + }, + "uco-observable:visitCount": { + "@type": "xsd:integer", + "@value": "18" + } + }, + { + "@id": "5860f49a-8471-5984-9610-402672d63f56", + "@type": "uco-observable:URLHistoryEntry", + "uco-observable:browserUserProfile": "Tamasin", + "uco-observable:expirationTime": { + "@type": "xsd:dateTime", + "@value": "2024-12-27T14:55:01+00:00" + }, + "uco-observable:firstVisit": { + "@type": "xsd:dateTime", + "@value": "2024-01-02T15:55:01+00:00" + }, + "uco-observable:hostname": "case_test", + "uco-observable:keywordSearchTerm": "git actions", + "uco-observable:lastVisit": { + "@type": "xsd:dateTime", + "@value": "2024-02-10T10:55:01+00:00" + }, + "uco-observable:manuallyEnteredCount": { + "@type": "xsd:nonNegativeInteger", + "@value": "21" + }, + "uco-observable:pageTitle": "GitHub actions tutorial", + "uco-observable:url": { + "@id": "41bc2540-fdd8-5753-864c-00a5de7d0d2e" + }, + "uco-observable:visitCount": { + "@type": "xsd:integer", + "@value": "38" + } + } + ] + } + ] + }, + { + "@id": "6e0ac6ce-6afc-58c6-b4b6-501fa5de2699", + "@type": "uco-observable:ObservableObject", + "uco-core:hasFacet": [ + { + "@id": "00f08d34-2abd-53e3-b758-cd5ba00300c9", + "@type": "uco-observable:PhoneAccountFacet", + "uco-observable:phoneNumber": "123456" + } + ] + }, + { + "@id": "3d6f634e-7c5d-5ff8-b996-8d738bb2793a", + "@type": "uco-observable:ObservableObject", + "uco-core:hasFacet": [ + { + "@id": "17525359-a042-5503-ad5f-f8f9bcd53472", "@type": "uco-observable:PhoneAccountFacet", "uco-observable:phoneNumber": "987654" } ] }, { - "@id": "997f05df-f309-588d-829f-c20e3cbadb57", + "@id": "1c09087a-0577-5025-a23b-c3e0155a5bf7", "@type": "uco-observable:ObservableObject", "uco-core:hasFacet": [ { - "@id": "5860f49a-8471-5984-9610-402672d63f56", + "@id": "0336f3a3-ff74-5e34-97bb-14534316f393", "@type": "uco-observable:MessageFacet", "uco-observable:messageText": "Are you free this weekend?", "uco-observable:sentTime": { @@ -324,39 +423,39 @@ "@value": "2023-01-01T01:08:08.000008+00:00" }, "uco-observable:from": { - "@id": "41bc2540-fdd8-5753-864c-00a5de7d0d2e" + "@id": "6e0ac6ce-6afc-58c6-b4b6-501fa5de2699" }, "uco-observable:to": [ { - "@id": "41bc2540-fdd8-5753-864c-00a5de7d0d2e" + "@id": "6e0ac6ce-6afc-58c6-b4b6-501fa5de2699" }, { - "@id": "9af4324d-cb7b-54d7-aba4-95714396435e" + "@id": "3d6f634e-7c5d-5ff8-b996-8d738bb2793a" } ], "uco-observable:application": { - "@id": "697d25c6-00bd-571e-aebd-5a96e34ea4b5" + "@id": "80c8f41b-e720-51db-b650-1df11b5f2acb" } } ] }, { - "@id": "697d25c6-00bd-571e-aebd-5a96e34ea4b5", + "@id": "80c8f41b-e720-51db-b650-1df11b5f2acb", "@type": "uco-observable:ObservableObject", "uco-core:hasFacet": [ { - "@id": "641fc5c4-49a3-573e-bdb2-d6d8dc000fee", + "@id": "35fb4ae9-6e55-59b7-a797-6aff1494a314", "@type": "uco-observable:ApplicationFacet", "uco-core:name": "WhatsApp" } ] }, { - "@id": "6e0ac6ce-6afc-58c6-b4b6-501fa5de2699", + "@id": "6d5906a1-df3f-5aa3-8c0d-c76c570d8b96", "@type": "uco-identity:Identity", "uco-core:hasFacet": [ { - "@id": "3d6f634e-7c5d-5ff8-b996-8d738bb2793a", + "@id": "742d8932-3fe2-5c0f-afc4-9dd2e20533e3", "@type": "uco-identity:BirthInformationFacet", "uco-identity:birthdate": { "@type": "xsd:dateTime", @@ -364,7 +463,7 @@ } }, { - "@id": "00f08d34-2abd-53e3-b758-cd5ba00300c9", + "@id": "24cd8876-1c97-5fce-9976-8733f6e901f6", "@type": "uco-identity:SimpleNameFacet", "uco-identity:givenName": "Davey", "uco-identity:familyName": "Jones" @@ -372,11 +471,11 @@ ] }, { - "@id": "17525359-a042-5503-ad5f-f8f9bcd53472", + "@id": "32599b39-84d2-5471-9452-c494171897fd", "@type": "uco-location:Location", "uco-core:hasFacet": [ { - "@id": "1c09087a-0577-5025-a23b-c3e0155a5bf7", + "@id": "6f716a17-3a29-513a-9e21-8bf6112c1d50", "@type": "uco-location:LatLongCoordinatesFacet", "uco-location:latitude": { "@type": "xsd:decimal", @@ -390,7 +489,7 @@ ] }, { - "@id": "80c8f41b-e720-51db-b650-1df11b5f2acb", + "@id": "3344007e-c9c4-5437-9766-4001fc9f3af5", "@type": "case-investigation:Investigation", "uco-core:name": "Crime A", "case-investigation:focus": "Transfer of Illicit Materials", @@ -510,7 +609,7 @@ ] }, { - "@id": "0336f3a3-ff74-5e34-97bb-14534316f393", + "@id": "bb9cd337-eb51-50dd-9cee-00f9e9acbf94", "@type": "uco-observable:Message", "uco-observable:state": "some state", "uco-observable:hasChanged": { @@ -521,45 +620,45 @@ "olo:slot": null, "uco-core:hasFacet": [ { - "@id": "6d5906a1-df3f-5aa3-8c0d-c76c570d8b96", + "@id": "73af707b-f00e-5327-8fd4-83467b4441c4", "@type": "uco-observable:MessageFacet" } ] }, { - "@id": "24cd8876-1c97-5fce-9976-8733f6e901f6", + "@id": "7c4d5cff-deb0-5031-b881-c572d6ef2fd7", "@type": "uco-observable:Message", "olo:length": null, "olo:slot": null, "uco-core:hasFacet": [ { - "@id": "742d8932-3fe2-5c0f-afc4-9dd2e20533e3", + "@id": "551338f3-0bb6-5325-a63a-f164f8750a17", "@type": "uco-observable:MessageFacet" } ] }, { - "@id": "32599b39-84d2-5471-9452-c494171897fd", + "@id": "09b10c04-d04b-5808-a1b6-0392dd3a00e8", "@type": "uco-observable:Message", "olo:length": null, "olo:slot": null, "uco-core:hasFacet": [ { - "@id": "6f716a17-3a29-513a-9e21-8bf6112c1d50", + "@id": "a19e1e1f-3953-5fb9-92b6-2b46f85752b2", "@type": "uco-observable:MessageFacet" } ] }, { - "@id": "35fb4ae9-6e55-59b7-a797-6aff1494a314", + "@id": "562c20c5-bef7-5919-a969-6c820a80e0d3", "@type": "uco-observable:MessageThread", "uco-core:hasFacet": [ { - "@id": "3344007e-c9c4-5437-9766-4001fc9f3af5", + "@id": "6f79d4ae-d92c-5cad-bbe5-a0afde6f475a", "@type": "uco-observable:MessageThreadFacet", "uco-observable:displayName": "some name", "uco-observable:messageThread": { - "@id": "562c20c5-bef7-5919-a969-6c820a80e0d3", + "@id": "56f74818-1d3d-51f9-8cb1-d6bdc8ecee60", "@type": "uco-types:Thread", "co:size": { "@type": "xsd:nonNegativeInteger", @@ -567,13 +666,13 @@ }, "co:element": [ { - "@id": "0336f3a3-ff74-5e34-97bb-14534316f393" + "@id": "09b10c04-d04b-5808-a1b6-0392dd3a00e8" }, { - "@id": "24cd8876-1c97-5fce-9976-8733f6e901f6" + "@id": "7c4d5cff-deb0-5031-b881-c572d6ef2fd7" }, { - "@id": "32599b39-84d2-5471-9452-c494171897fd" + "@id": "bb9cd337-eb51-50dd-9cee-00f9e9acbf94" } ] } diff --git a/case_mapping/mix_utils.py b/case_mapping/mix_utils.py new file mode 100644 index 0000000..ae3d9b8 --- /dev/null +++ b/case_mapping/mix_utils.py @@ -0,0 +1,37 @@ +from typing import Any, Callable + + +def check_value( + *args: Any, + value: str, + list_values: list[str], + list_objects: list[dict[str, Any]], + observable_generating_f: Callable[..., dict[str, Any]] +) -> dict[str, Any]: + """It checks if a specific value has been already generated related to an ObservableObject relying on + the list of its values. This is meant to avoid duplication in the JSON/CASE file generated by the + parsers (UFED, AXIOM etc.). + If the value is not in the list_values, a new ObservableObject is generated by using the function + observable_generating_f that returns, as a result, the new ObservableObject (e.g. uco-observable:ApplicationFacet, + uco-observable:AccountFacet, uco-location:LatLongCoordinatesFacet: drafting:SearchedItemFacet, "uco-observable:URLFacet, + uco-observable:ApplicationAccountFacet, uco-observable:DigitalAccountFacet, uco-observable:PhoneAccountFacet). + + Finally the new ObservableObject is added to the list_objects (any kind of ObservableObject maintains a different list). + If the value is already in the list_values, the ObservableObject list_objects[index] is returned. + + :param value: the value to be checked within the list_values + :param list_values: the current list of values + :param list_objects: the current list of a specific kind of ObservableObject + :param observable_generating_f: the function that will generate the corresponding kind of ObservableObject + :param *args: the actual parameter of the observable_generating_f function + :return: an Observableobject of a specific kind depending by the actual parameters + """ + if value in list_values: + idx = list_values.index(value) + observable_app = list_objects[idx] + else: + observable_app = observable_generating_f(*args) + list_values.append(value) + list_objects.append(observable_app) + + return observable_app diff --git a/case_mapping/uco/observable.py b/case_mapping/uco/observable.py index 04e71da..2fb00ba 100644 --- a/case_mapping/uco/observable.py +++ b/case_mapping/uco/observable.py @@ -373,78 +373,168 @@ def __init__(self, has_changed=None, state=None, facets=None): class FacetUrlHistory(FacetEntity): - def __init__(self, browser_info, history_entries=None): + def __init__(self, browser=None, history_entries=None): """ :param browser_info: An observable object containing a URLHistoryFacet - :param history_entries: A list of URLHistoryEntry types + :param history_entries: A list of dictionaries, each dict has the + following keys: + "uco-observable:browserUserProfile": str, + "uco-observable:expirationTime" : datetime, + "uco-observable:firstVisit": datetime, + "uco-observable:hostname": str, + "uco-observable:keywordSearchTerm": str, + "uco-observable:lastVisit" : datetime, + "uco-observable:manuallyEnteredCount": non negative int + "uco-observable:pageTitle": str, + "uco-observable:ble:referrerUrl": url_object, + "uco-observable:url": url_object, + "uco-observable:visitCount": int, """ - super().__init__() self["@type"] = "uco-observable:URLHistoryFacet" - self._node_reference_vars(**{"uco-observable:browserInformation": browser_info}) - self.append_history_entries(history_entries) - - @unpack_args_array - def append_history_entries(self, *args): - """ - Used to add history entries to this URL History facet - :param args: A single/tuple of URLHistoryEntry class types - """ - self._append_observable_objects("uco-observable:urlHistoryEntry", *args) - - -class UrlHistoryEntry(FacetEntity): - def __init__( - self, - first_visit=None, - last_visit=None, - expiration_time=None, - manually_entered_count=None, - url=None, - user_profile=None, - page_title=None, - referrer_url=None, - visit_count=None, - keyword_search_term=None, - allocation_status=None, - ): - """ - :param first_visit: - :param last_visit: - :param expiration_time: - :param manually_entered_count: - :param url: An observable object with a URLFacet - :param user_profile: - :param page_title: - :param referrer_url: - :param visit_count: - :param keyword_search_term: - :param allocation_status: - """ - - super().__init__() - self["@type"] = "uco-observable:URLHistoryEntry" - self._str_vars( + self._node_reference_vars( **{ - "uco-observable:userProfile": user_profile, # todo: referral? - "uco-observable:pageTitle": page_title, - "uco-observable:referrerUrl": referrer_url, - "uco-observable:keywordSearchTerm": keyword_search_term, - "uco-observable:allocationStatus": allocation_status, + "uco-observable:browserInformation": browser, } ) - self._int_vars(**{"uco-observable:visitCount": visit_count}) - self._datetime_vars( - **{ - "uco-observable:firstVisit": first_visit, - "uco-observable:lastVisit": last_visit, - "uco-observable:expirationTime": expiration_time, - } + + keys_str = ( + "uco-observable:browserUserProfile", + "uco-observable:hostname", + "uco-observable:pageTitle", + "uco-observable:keywordSearchTerm", + "uco-observable:urlHistoryEntry", ) - self._nonegative_int_vars( - **{"uco-observable:manuallyEnteredCount": manually_entered_count} + keys_datetime = ( + "uco-observable:firstVisit", + "uco-observable:lastVisit", + "uco-observable:expirationTime", ) - self._node_reference_vars(**{"uco-observable:url": url}) + keys_int = "uco-observable:visitCount" + keys_ref = ("uco-observable:ble:referrerUrl", "uco-observable:url") + + self["uco-observable:urlHistoryEntry"] = [] + for entry in history_entries: + history_entry = {} + history_entry["@id"] = local_uuid() + history_entry["@type"] = "uco-observable:URLHistoryEntry" + for key, var in entry.items(): + if key in keys_str: + if isinstance(var, str): + history_entry[key] = var + else: + self.__handle_var_type_errors(key, var, "str") + elif key in keys_datetime: + if isinstance(var, datetime): + tz_info = var.strftime("%z") + iso_format = ( + var.isoformat() if tz_info else var.isoformat() + "+00:00" + ) + history_entry[key] = { + "@type": "xsd:dateTime", + "@value": iso_format, + } + else: + self.__handle_var_type_errors(key, var, "datetime") + elif key in keys_int: + if isinstance(var, int): + history_entry[key] = { + "@type": "xsd:integer", + "@value": str(var), + } + else: + self.__handle_var_type_errors(key, var, "int") + elif key in keys_ref: + if isinstance(var, list) or isinstance(var, tuple): + is_object_entity = [ + isinstance(item, ObjectEntity) for item in var + ] + if all(is_object_entity): + history_entry[key] = [ + {"@id": item.get_id()} for item in var + ] + else: + self.__handle_list_type_errors( + key, var, "ObjectEntity (no @id key)" + ) + elif isinstance(var, ObjectEntity): + history_entry[key] = {"@id": var.get_id()} + else: + self.__handle_var_type_errors( + key, var, "ObjectEntity (no @id key)" + ) + elif key == "uco-observable:manuallyEnteredCount": + history_entry[key] = { + "@type": "xsd:nonNegativeInteger", + "@value": "%d" % var, + } + + self["uco-observable:urlHistoryEntry"].append(history_entry) + + +# class UrlHistoryEntry(FacetEntity): +# It's no longer necessary, all data are included in the above FacetUrlHistory class +# def __init__( +# self, +# browser_user_profile=None, +# expiration_time=None, +# first_visit=None, +# host_name=None, +# keyword_search_term=None, +# last_visit=None, +# manually_entered_count=None, +# page_title=None, +# referrer_url=None, +# url=None, +# visit_count=None, +# ): +# """ +# :param browser_user_profile: The web browser user profile for which the URL history entry was created. +# :param expiration_time: The date and time at which the validity of the object expires. +# :param first_visit: The date/time that the URL referred to by the URL field was first visited. +# :param host_name: The hostname of the system. +# :param keyword_search_term: The string representing a keyword search term contained within the URL field. +# :param last_visit: The date/time that the URL referred to by the URL field was last visited. +# :param manually_entered_count: The number of times the URL referred to by the URL field was manually entered into the browser's address field by the user. +# :param page_title: The title of a web page +# :param referrer_url: The origination point (i.e., URL) of a URL request. +# :param url: An observable object with a URLFacet. +# :param visit_count: The number of times a URL has been visited by a particular web browser. +# """ + +# super().__init__() +# self["@type"] = "uco-observable:URLHistoryEntry" +# self._str_vars( +# **{ +# "uco-observable:browserUserProfile": browser_user_profile, +# "uco-observable:hostname": host_name, +# "uco-observable:pageTitle": page_title, +# "uco-observable:keywordSearchTerm": keyword_search_term, +# } +# ) +# self._int_vars(**{"uco-observable:visitCount": visit_count}) +# self._datetime_vars( +# **{ +# "uco-observable:firstVisit": first_visit, +# "uco-observable:lastVisit": last_visit, +# "uco-observable:expirationTime": expiration_time, +# } +# ) +# self._node_reference_vars( +# **{ +# "uco-observable:ble:referrerUrl": referrer_url, +# "uco-observable:url": url, +# } +# ) +# # TODO AJN: This is one instance of xsd:nonNegativeInteger. +# # There are other instances in the ontology requiring +# # nonNegativeIntegers. Hence, the FacetEntity class needs to +# # have a helper function added. +# # https://github.com/casework/CASE-Mapping-Python/issues/37 +# self["uco-observable:manuallyEnteredCount"] = { +# "@type": "xsd:nonNegativeInteger", +# "@value": "%d" % manually_entered_count, +# } class FacetUrl(FacetEntity): @@ -488,6 +578,47 @@ def __init__( self._int_vars(**{"uco-observable:port": url_port}) +class FacetBrowserBookmark(FacetEntity): + def __init__( + self, + accessedTime=None, + application_id=None, + bookmarkPath=None, + modifiedTime=None, + createdTime=None, + urlTargeted_id=None, + visitCount=None, + ): + """ + This CASEObject represents a grouping of characteristics unique to a saved shortcut that directs a + WWW (World Wide Web) browser software program to a particular WWW accessible resource. + :param accessedTime: The date and time at which the Object was accessed (dateTime). + :param application_id: The application associated with this object (ObservableObject). + :param bookmarkPath: The folder containing the bookmark (string). + :param modifiedTime: The date and time at which the Object was last modified (dateTime). + :param createdTime: The date and time at which the observable object being characterized was created (dateTime). + :param urlTargeted_id: The target of the bookmark. (anyURI). + :param visitCount: Specifies the number of times a URL has been visited by a particular web browser (integer). + """ + super().__init__() + self["@type"] = "uco-observable:BrowserBookmarkFacet" + self._str_vars(**{"observable:bookmarkPath": bookmarkPath}) + self._int_vars(**{"uco-observable:visitCount": visitCount}) + self._node_reference_vars( + **{ + "uco-observable:application": application_id, + "uco-observable:urlTargeted": urlTargeted_id, + } + ) + self._datetime_vars( + **{ + "uco-observable:observableCreatedTime": accessedTime, + "uco-observable:modifiedTime": modifiedTime, + "uco-observable:accessedTime": accessedTime, + } + ) + + class FacetRasterPicture(FacetEntity): def __init__( self, @@ -1015,6 +1146,8 @@ def __init__( cyber_action=None, computer_name=None, created_time=None, + start_time=None, + end_time=None, ): """ An event facet is a grouping of characteristics unique to something that happens in a digital context @@ -1023,10 +1156,12 @@ def __init__( :param event_text: The textual representation of the event. :param event_id: The identifier of the event. :param cyber_action: The action taken in response to the event. - :param computer_name: A name of the computer on which the log entry was created. + :param created_time: The date and time at which the observable object being characterized was created. + :param start_time: The date and time at which the observable object being characterized started. + :param end_time: The date and time at which the observable object being characterized ended. """ super().__init__() - self["@type"] = "uco-observable:EventFacet" + self["@type"] = "uco-observable:EventRecordFacet" self._str_vars( **{ "uco-observable:eventType": event_type, @@ -1036,7 +1171,12 @@ def __init__( } ) self._node_reference_vars(**{"uco-observable:cyberAction": cyber_action}) - self._datetime_vars(**{"uco-observable:observableCreatedTime": created_time}) + self._datetime_vars( + **{ + "uco-observable:startTime": start_time, + "uco-observable:endTime": end_time, + } + ) class ObservableRelationship(ObjectEntity): @@ -1349,7 +1489,7 @@ def __init__(self, disk_type=None, size=None, partition=None): "uco-observable:BluetoothAddressFacet": BluetoothAddress, "uco-observable:ObservableObject": ObservableObject, "uco-observable:URLHistoryFacet": FacetUrlHistory, - "uco-observable:URLHistoryEntry": UrlHistoryEntry, + # "uco-observable:URLHistoryEntry": UrlHistoryEntry, "uco-observable:URLFacet": FacetUrl, "uco-observable:RasterPictureFacet": FacetRasterPicture, "uco-observable:CallFacet": FacetCall, diff --git a/case_mapping/uco/tool.py b/case_mapping/uco/tool.py index d7b6913..52785c4 100644 --- a/case_mapping/uco/tool.py +++ b/case_mapping/uco/tool.py @@ -8,7 +8,7 @@ def __init__( """ The Uco tool is a way to define the specifics of a tool used in an investigation :param tool_name: The name of the tool (e.g., "exiftool") - :param tool_creator: The developer and or organisation that produces this tool {might need to add a dict here} + :param tool_creator: An ObservableObject The developer and or organisation that produces this tool {might need to add a dict here} :param tool_type: The type of tool :param tool_version: The version of the tool """ @@ -19,9 +19,9 @@ def __init__( "uco-core:name": tool_name, "uco-tool:version": tool_version, "uco-tool:toolType": tool_type, - "uco-tool:creator": tool_creator, } ) + self._node_reference_vars(**{"uco-tool:creator": tool_creator}) directory = {"uco-tool:Tool": Tool} diff --git a/example.py b/example.py index 24eea7a..0f2d43d 100644 --- a/example.py +++ b/example.py @@ -5,7 +5,7 @@ from case_mapping import case, uco # This is part of enabling non-random UUIDs for the demonstration -# output. The other part is handled at call time, and can be seen in +# output. The other part is handled at call time, and can be seen in # the documentation for cdo_local_uuid._demo_uuid(). cdo_local_uuid.configure() @@ -14,7 +14,10 @@ def _next_timestamp() -> datetime: """ - This example previously used datetime.now(timezone.utc) to generate timestamps. This function instead creates a fixed-value timestamp sequence, to reduce diff noise from timestamps when re-running the example script. + This example previously used datetime.now(timezone.utc) to generate + timestamps. This function instead creates a fixed-value timestamp + sequence, to reduce diff noise from timestamps when re-running the + example script. """ global _current_timestamp_count base_timestamp = datetime(2023, 1, 1, 1, 1, 1, 1, timezone.utc) @@ -118,7 +121,7 @@ def _next_timestamp() -> datetime: bundle.append_to_uco_object(cyber_rel1) ############################## -# Adding an Email Account # # NOTE: Changes here compared to previous version +# Adding an Email Account # ############################## email_address_object_1 = uco.observable.ObservableObject() email_address_1 = uco.observable.FacetEmailAddress( @@ -159,6 +162,63 @@ def _next_timestamp() -> datetime: bundle.append_to_uco_object(cyber_item3) +################################################### +# Adding an FacetUrlHistory and aUrlHistoryEntry # +################################################### +url_object = uco.observable.ObservableObject() +url_facet = uco.observable.FacetUrl(url_address="www.docker.com/howto") +url_object.append_facets(url_facet) +bundle.append_to_uco_object(url_object) + +browser_object = uco.observable.ObservableObject() +browser_facet = uco.observable.FacetApplication(app_name="Safari") +browser_object.append_facets(browser_facet) +bundle.append_to_uco_object(browser_object) + +url_date_expiration = datetime.strptime("2024-12-27T14:55:01", "%Y-%m-%dT%H:%M:%S") +url_date_first = datetime.strptime("2024-01-02T15:55:01", "%Y-%m-%dT%H:%M:%S") +url_date_last = datetime.strptime("2024-02-10T10:55:01", "%Y-%m-%dT%H:%M:%S") + +history_entries = [] +history_entry_1 = { + "uco-observable:browserUserProfile": "Jill", + "uco-observable:expirationTime": url_date_expiration, + "uco-observable:firstVisit": url_date_first, + "uco-observable:hostname": "case_test", + "uco-observable:keywordSearchTerm": "docker", + "uco-observable:lastVisit": url_date_last, + "uco-observable:manuallyEnteredCount": 10, + "uco-observable:pageTitle": "Docker tutorial", + "uco-observable:referrerUrl": url_object, + "uco-observable:url": url_object, + "uco-observable:visitCount": 18, +} +history_entry_2 = { + "uco-observable:browserUserProfile": "Tamasin", + "uco-observable:expirationTime": url_date_expiration, + "uco-observable:firstVisit": url_date_first, + "uco-observable:hostname": "case_test", + "uco-observable:keywordSearchTerm": "git actions", + "uco-observable:lastVisit": url_date_last, + "uco-observable:manuallyEnteredCount": 21, + "uco-observable:pageTitle": "GitHub actions tutorial", + "uco-observable:referrerUrl": url_object, + "uco-observable:url": url_object, + "uco-observable:visitCount": 38, +} + +url_history_entry_object = uco.observable.ObservableObject() + +history_entries.append(history_entry_1) +history_entries.append(history_entry_2) +url_history_facet = uco.observable.FacetUrlHistory( + browser=browser_object, history_entries=history_entries +) + +url_history_entry_object.append_facets(url_history_facet) +bundle.append_to_uco_object(url_history_entry_object) + + ############################ # Adding an SMS Account # ############################ diff --git a/tests/test_duplicate.py b/tests/test_duplicate.py new file mode 100644 index 0000000..aed691d --- /dev/null +++ b/tests/test_duplicate.py @@ -0,0 +1,145 @@ +import uuid +from typing import Any, Union + +from case_mapping import mix_utils + + +def check_app_name( + app_name: str, app_names: list[str], app_objects: list[dict[str, Any]], uuid: str +) -> dict[str, Any]: + # c_check = mix_utils.CheckDuplicate() + observable_app = mix_utils.check_value( + app_name, + uuid, + value=app_name, + list_values=app_names, + list_objects=app_objects, + observable_generating_f=generateTraceAppName, + ) + return observable_app + + +def check_geo_coordinates( + latitude: float, + longitude: float, + geo_coordinates: list[str], + geo_objects: list[dict[str, Any]], + uuid: str, +) -> dict[str, Any]: + # c_check = mix_utils.CheckDuplicate() + observable_app = mix_utils.check_value( + latitude, + longitude, + uuid, + value=str(latitude) + "#" + str(longitude), + list_values=geo_coordinates, + list_objects=geo_objects, + observable_generating_f=generateTraceLocationCoordinate, + ) + return observable_app + + +def generateTraceAppName(app_name: str, uuid: str) -> dict[str, Any]: + observable = { + "@type": "uco-observable:ApplicationFacet", + "@id": uuid, + "uco-core:name": app_name, + } + return observable + + +def generateTraceLocationCoordinate( + latitude: Union[float, str], longitude: Union[float, str], uuid: str +) -> dict[str, Any]: + observable = { + "@type": "uco-location:LatLongCoordinatesFacet", + "@id": uuid, + "uco-location:latitude": {"@type": "xsd:decimal", "@value": str(latitude)}, + "uco-location:longitude": {"@type": "xsd:decimal", "@value": str(longitude)}, + } + return observable + + +def test_app_name() -> None: + app_names: list[str] = list() + app_objects: list[dict[str, Any]] = list() + app_1 = "Safari" + uuid_1 = "kb:" + str(uuid.uuid4()) + check_app_name(app_1, app_names, app_objects, uuid_1) + assert app_names == [app_1] + app_2 = "Chrome" + uuid_2 = "kb:" + str(uuid.uuid4()) + check_app_name(app_2, app_names, app_objects, uuid_2) + assert app_names == [app_1, app_2] + uuid_3 = "kb:" + str(uuid.uuid4()) + object_app = check_app_name(app_1, app_names, app_objects, uuid_3) + assert object_app == { + "@type": "uco-observable:ApplicationFacet", + "@id": uuid_1, + "uco-core:name": app_1, + } + assert app_names == [app_1, app_2] + assert app_objects == [ + { + "@type": "uco-observable:ApplicationFacet", + "@id": uuid_1, + "uco-core:name": app_1, + }, + { + "@type": "uco-observable:ApplicationFacet", + "@id": uuid_2, + "uco-core:name": app_2, + }, + ] + + +def test_geo_coordinates() -> None: + geo_coordinates: list[str] = list() + geo_objects: list[dict[str, Any]] = list() + (lat_1, long_1, uuid_1) = (56.47267913, -71.17069244, "kb:" + str(uuid.uuid4())) + check_geo_coordinates(lat_1, long_1, geo_coordinates, geo_objects, uuid_1) + # print(f"\n 1) FT geo_coordinates={geo_coordinates}") + assert geo_coordinates == [str(lat_1) + "#" + str(long_1)] + (lat_2, long_2, uuid_2) = (88.26801306, 13.21980922, "kb:" + str(uuid.uuid4())) + check_geo_coordinates(lat_2, long_2, geo_coordinates, geo_objects, uuid_2) + # print(f"\n 2) FT geo_coordinates={geo_coordinates}") + assert geo_coordinates == [ + str(lat_1) + "#" + str(long_1), + str(lat_2) + "#" + str(long_2), + ] + uuid_3 = "kb:" + str(uuid.uuid4()) + uuid_4 = "kb:" + str(uuid.uuid4()) + geo_object = check_geo_coordinates( + lat_1, long_1, geo_coordinates, geo_objects, uuid_3 + ) + assert geo_object == { + "@type": "uco-location:LatLongCoordinatesFacet", + "@id": uuid_1, + "uco-location:latitude": {"@type": "xsd:decimal", "@value": str(lat_1)}, + "uco-location:longitude": {"@type": "xsd:decimal", "@value": str(long_1)}, + } + # print(f"\n 3) FT geo_coordinates={geo_coordinates}") + assert geo_coordinates == [ + str(lat_1) + "#" + str(long_1), + str(lat_2) + "#" + str(long_2), + ] + check_geo_coordinates(lat_2, long_2, geo_coordinates, geo_objects, uuid_4) + # print(f"\n 4) FT geo_coordinates={geo_coordinates}") + assert geo_coordinates == [ + str(lat_1) + "#" + str(long_1), + str(lat_2) + "#" + str(long_2), + ] + assert geo_objects == [ + { + "@type": "uco-location:LatLongCoordinatesFacet", + "@id": uuid_1, + "uco-location:latitude": {"@type": "xsd:decimal", "@value": str(lat_1)}, + "uco-location:longitude": {"@type": "xsd:decimal", "@value": str(long_1)}, + }, + { + "@type": "uco-location:LatLongCoordinatesFacet", + "@id": uuid_2, + "uco-location:latitude": {"@type": "xsd:decimal", "@value": str(lat_2)}, + "uco-location:longitude": {"@type": "xsd:decimal", "@value": str(long_2)}, + }, + ]