From 749cc84327db60d288579eb9696a5dd13e946e29 Mon Sep 17 00:00:00 2001 From: tjoubert Date: Mon, 26 Sep 2022 11:33:28 +0400 Subject: [PATCH 1/3] Retrieve all jobs and the new 3.10 details element --- arango/formatter.py | 105 ++++++++++++++++++++++++++++++++++++++------ arango/pregel.py | 32 +++++++++++++- 2 files changed, 122 insertions(+), 15 deletions(-) diff --git a/arango/formatter.py b/arango/formatter.py index ffab8c4b..4984bf82 100644 --- a/arango/formatter.py +++ b/arango/formatter.py @@ -1,4 +1,4 @@ -from typing import Any +from typing import Any, List, Sequence from arango.typings import Headers, Json @@ -1072,26 +1072,105 @@ def format_pregel_job_data(body: Json) -> Json: """ result: Json = {} - if "aggregators" in body: - result["aggregators"] = body["aggregators"] + if "id" in body: + result["id"] = body["id"] + if "algorithm" in body: + result["algorithm"] = body["algorithm"] + if "created" in body: + result["created"] = body["created"] + if "expires" in body: + result["expires"] = body["expires"] + if "ttl" in body: + result["ttl"] = body["ttl"] + if "algorithm" in body: + result["algorithm"] = body["algorithm"] + if "state" in body: + result["state"] = body["state"] + if "gss" in body: + result["gss"] = body["gss"] + if "totalRuntime" in body: + result["total_runtime"] = body["totalRuntime"] + if "startupTime" in body: + result["startup_time"] = body["startupTime"] if "computationTime" in body: result["computation_time"] = body["computationTime"] + if "storageTime" in body: + result["storageTime"] = body["storageTime"] + if "gssTimes" in body: + result["gssTimes"] = body["gssTimes"] + if "reports" in body: + result["reports"] = body["reports"] + if "vertexCount" in body: + result["vertex_count"] = body["vertexCount"] if "edgeCount" in body: result["edge_count"] = body["edgeCount"] - if "gss" in body: - result["gss"] = body["gss"] + + # The detail element was introduced in 3.10 + if "detail" in body: + d: Json = {} + detail = body["detail"] + if "workerStatus" in detail: + d["workerStatus"] = detail["workerStatus"] + if "aggregatedStatus" in detail: + aggregatedStatus = detail["aggregatedStatus"] + aStat: Json = {} + if "timeStamp" in aggregatedStatus: + aStat["timeStamp"] = aggregatedStatus["timeStamp"] + if "graphStoreStatus" in aggregatedStatus: + graphStoreStatus = aggregatedStatus["graphStoreStatus"] + gsStat: Json = {} + if "verticesLoaded" in graphStoreStatus: + gsStat["verticesLoaded"] = graphStoreStatus["verticesLoaded"] + if "edgesLoaded" in graphStoreStatus: + gsStat["edgesLoaded"] = graphStoreStatus["edgesLoaded"] + if "memoryBytesUsed" in graphStoreStatus: + gsStat["memoryBytesUsed"] = graphStoreStatus["memoryBytesUsed"] + if "verticesStored" in graphStoreStatus: + gsStat["verticesStored"] = graphStoreStatus["verticesStored"] + aStat["graphStoreStatus"] = gsStat + if "allGssStatus" in aggregatedStatus: + allGssStatus = aggregatedStatus["allGssStatus"] + agStat: Json = {} + if "items" in allGssStatus: + items = allGssStatus["items"] + itemList: List[Json] = [] + for i in items: + ri: Json = {} + if "verticesProcessed" in i: + ri["verticesProcessed"] = i["verticesProcessed"] + if "messagesSent" in i: + ri["messagesSent"] = i["messagesSent"] + if "messagesReceived" in i: + ri["messagesReceived"] = i["messagesReceived"] + if "memoryBytesUsedForMessages" in i: + ri["memoryBytesUsedForMessages"] = i[ + "memoryBytesUsedForMessages" + ] + itemList.append(ri) + agStat["items"] = itemList + aStat["allGssStatus"] = agStat + d[aggregatedStatus] = aStat + result["detail"] = d + + if "aggregators" in body: + result["aggregators"] = body["aggregators"] if "receivedCount" in body: result["received_count"] = body["receivedCount"] if "sendCount" in body: result["send_count"] = body["sendCount"] - if "startupTime" in body: - result["startup_time"] = body["startupTime"] - if "state" in body: - result["state"] = body["state"] - if "totalRuntime" in body: - result["total_runtime"] = body["totalRuntime"] - if "vertexCount" in body: - result["vertex_count"] = body["vertexCount"] + + return verify_format(body, result) + + +def format_pregel_job_list(body: Sequence[Json]) -> Json: + """Format Pregel job list data. + + :param body: Input body. + :type body: dict + :return: Formatted body. + :rtype: dict + """ + result: Json = {"jobs": [format_pregel_job_data(j) for j in body]} return verify_format(body, result) diff --git a/arango/pregel.py b/arango/pregel.py index 81313047..0cde3ce5 100644 --- a/arango/pregel.py +++ b/arango/pregel.py @@ -1,6 +1,6 @@ __all__ = ["Pregel"] -from typing import Optional +from typing import Optional, Sequence from arango.api import ApiGroup from arango.exceptions import ( @@ -8,7 +8,7 @@ PregelJobDeleteError, PregelJobGetError, ) -from arango.formatter import format_pregel_job_data +from arango.formatter import format_pregel_job_data, format_pregel_job_list from arango.request import Request from arango.response import Response from arango.result import Result @@ -49,6 +49,8 @@ def create_job( async_mode: Optional[bool] = None, result_field: Optional[str] = None, algorithm_params: Optional[Json] = None, + vertexCollections: Optional[Sequence[str]] = None, + edgeCollections: Optional[Sequence[str]] = None, ) -> Result[int]: """Start a new Pregel job. @@ -74,12 +76,21 @@ def create_job( :type result_field: str | None :param algorithm_params: Additional algorithm parameters. :type algorithm_params: dict | None + :param vertexCollections: List of vertex collection names. + :type vertexCollections: Sequence[str] | None + :param edgeCollections: List of edge collection names. + :type edgeCollections: Sequence[str] | None :return: Pregel job ID. :rtype: int :raise arango.exceptions.PregelJobCreateError: If create fails. """ data: Json = {"algorithm": algorithm, "graphName": graph} + if vertexCollections is not None: + data["vertexCollections"] = vertexCollections + if edgeCollections is not None: + data["edgeCollections"] = edgeCollections + if algorithm_params is None: algorithm_params = {} @@ -122,3 +133,20 @@ def response_handler(resp: Response) -> bool: raise PregelJobDeleteError(resp, request) return self._execute(request, response_handler) + + def jobs(self) -> Result[Json]: + """Returns a list of currently running and recently + finished Pregel jobs without retrieving their results. + + :return: Details of each running or recently finished Pregel job. + :rtype: dict + :raise arango.exceptions.PregelJobGetError: If retrieval fails. + """ + request = Request(method="get", endpoint="/_api/control_pregel") + + def response_handler(resp: Response) -> Json: + if resp.is_success: + return format_pregel_job_list(resp.body) + raise PregelJobGetError(resp, request) + + return self._execute(request, response_handler) From 82b72d830467cf2748bab25815c3304d4b3b4d45 Mon Sep 17 00:00:00 2001 From: tjoubert Date: Mon, 26 Sep 2022 16:54:20 +0400 Subject: [PATCH 2/3] Removed deepcopy of detail element --- arango/formatter.py | 45 +-------------------------------------------- 1 file changed, 1 insertion(+), 44 deletions(-) diff --git a/arango/formatter.py b/arango/formatter.py index 4984bf82..306a0fc3 100644 --- a/arango/formatter.py +++ b/arango/formatter.py @@ -1107,50 +1107,7 @@ def format_pregel_job_data(body: Json) -> Json: # The detail element was introduced in 3.10 if "detail" in body: - d: Json = {} - detail = body["detail"] - if "workerStatus" in detail: - d["workerStatus"] = detail["workerStatus"] - if "aggregatedStatus" in detail: - aggregatedStatus = detail["aggregatedStatus"] - aStat: Json = {} - if "timeStamp" in aggregatedStatus: - aStat["timeStamp"] = aggregatedStatus["timeStamp"] - if "graphStoreStatus" in aggregatedStatus: - graphStoreStatus = aggregatedStatus["graphStoreStatus"] - gsStat: Json = {} - if "verticesLoaded" in graphStoreStatus: - gsStat["verticesLoaded"] = graphStoreStatus["verticesLoaded"] - if "edgesLoaded" in graphStoreStatus: - gsStat["edgesLoaded"] = graphStoreStatus["edgesLoaded"] - if "memoryBytesUsed" in graphStoreStatus: - gsStat["memoryBytesUsed"] = graphStoreStatus["memoryBytesUsed"] - if "verticesStored" in graphStoreStatus: - gsStat["verticesStored"] = graphStoreStatus["verticesStored"] - aStat["graphStoreStatus"] = gsStat - if "allGssStatus" in aggregatedStatus: - allGssStatus = aggregatedStatus["allGssStatus"] - agStat: Json = {} - if "items" in allGssStatus: - items = allGssStatus["items"] - itemList: List[Json] = [] - for i in items: - ri: Json = {} - if "verticesProcessed" in i: - ri["verticesProcessed"] = i["verticesProcessed"] - if "messagesSent" in i: - ri["messagesSent"] = i["messagesSent"] - if "messagesReceived" in i: - ri["messagesReceived"] = i["messagesReceived"] - if "memoryBytesUsedForMessages" in i: - ri["memoryBytesUsedForMessages"] = i[ - "memoryBytesUsedForMessages" - ] - itemList.append(ri) - agStat["items"] = itemList - aStat["allGssStatus"] = agStat - d[aggregatedStatus] = aStat - result["detail"] = d + result["detail"] = body["detail"] if "aggregators" in body: result["aggregators"] = body["aggregators"] From 52683da4846115b136580c6fa06cf9a18b802eba Mon Sep 17 00:00:00 2001 From: tjoubert Date: Mon, 26 Sep 2022 16:56:45 +0400 Subject: [PATCH 3/3] Removed List import --- arango/formatter.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/arango/formatter.py b/arango/formatter.py index 306a0fc3..da0610e9 100644 --- a/arango/formatter.py +++ b/arango/formatter.py @@ -1,4 +1,4 @@ -from typing import Any, List, Sequence +from typing import Any, Sequence from arango.typings import Headers, Json @@ -1104,11 +1104,6 @@ def format_pregel_job_data(body: Json) -> Json: result["vertex_count"] = body["vertexCount"] if "edgeCount" in body: result["edge_count"] = body["edgeCount"] - - # The detail element was introduced in 3.10 - if "detail" in body: - result["detail"] = body["detail"] - if "aggregators" in body: result["aggregators"] = body["aggregators"] if "receivedCount" in body: @@ -1116,6 +1111,10 @@ def format_pregel_job_data(body: Json) -> Json: if "sendCount" in body: result["send_count"] = body["sendCount"] + # The detail element was introduced in 3.10 + if "detail" in body: + result["detail"] = body["detail"] + return verify_format(body, result)