diff --git a/arango/formatter.py b/arango/formatter.py index ffab8c4b..da0610e9 100644 --- a/arango/formatter.py +++ b/arango/formatter.py @@ -1,4 +1,4 @@ -from typing import Any +from typing import Any, Sequence from arango.typings import Headers, Json @@ -1072,26 +1072,61 @@ def format_pregel_job_data(body: Json) -> Json: """ result: Json = {} - if "aggregators" in body: - result["aggregators"] = body["aggregators"] + if "id" in body: + result["id"] = body["id"] + if "algorithm" in body: + result["algorithm"] = body["algorithm"] + if "created" in body: + result["created"] = body["created"] + if "expires" in body: + result["expires"] = body["expires"] + if "ttl" in body: + result["ttl"] = body["ttl"] + if "algorithm" in body: + result["algorithm"] = body["algorithm"] + if "state" in body: + result["state"] = body["state"] + if "gss" in body: + result["gss"] = body["gss"] + if "totalRuntime" in body: + result["total_runtime"] = body["totalRuntime"] + if "startupTime" in body: + result["startup_time"] = body["startupTime"] if "computationTime" in body: result["computation_time"] = body["computationTime"] + if "storageTime" in body: + result["storageTime"] = body["storageTime"] + if "gssTimes" in body: + result["gssTimes"] = body["gssTimes"] + if "reports" in body: + result["reports"] = body["reports"] + if "vertexCount" in body: + result["vertex_count"] = body["vertexCount"] if "edgeCount" in body: result["edge_count"] = body["edgeCount"] - if "gss" in body: - result["gss"] = body["gss"] + if "aggregators" in body: + result["aggregators"] = body["aggregators"] if "receivedCount" in body: result["received_count"] = body["receivedCount"] if "sendCount" in body: result["send_count"] = body["sendCount"] - if "startupTime" in body: - result["startup_time"] = body["startupTime"] - if "state" in body: - result["state"] = body["state"] - if "totalRuntime" in body: - result["total_runtime"] = body["totalRuntime"] - if "vertexCount" in body: - result["vertex_count"] = body["vertexCount"] + + # The detail element was introduced in 3.10 + if "detail" in body: + result["detail"] = body["detail"] + + return verify_format(body, result) + + +def format_pregel_job_list(body: Sequence[Json]) -> Json: + """Format Pregel job list data. + + :param body: Input body. + :type body: dict + :return: Formatted body. + :rtype: dict + """ + result: Json = {"jobs": [format_pregel_job_data(j) for j in body]} return verify_format(body, result) diff --git a/arango/pregel.py b/arango/pregel.py index 81313047..0cde3ce5 100644 --- a/arango/pregel.py +++ b/arango/pregel.py @@ -1,6 +1,6 @@ __all__ = ["Pregel"] -from typing import Optional +from typing import Optional, Sequence from arango.api import ApiGroup from arango.exceptions import ( @@ -8,7 +8,7 @@ PregelJobDeleteError, PregelJobGetError, ) -from arango.formatter import format_pregel_job_data +from arango.formatter import format_pregel_job_data, format_pregel_job_list from arango.request import Request from arango.response import Response from arango.result import Result @@ -49,6 +49,8 @@ def create_job( async_mode: Optional[bool] = None, result_field: Optional[str] = None, algorithm_params: Optional[Json] = None, + vertexCollections: Optional[Sequence[str]] = None, + edgeCollections: Optional[Sequence[str]] = None, ) -> Result[int]: """Start a new Pregel job. @@ -74,12 +76,21 @@ def create_job( :type result_field: str | None :param algorithm_params: Additional algorithm parameters. :type algorithm_params: dict | None + :param vertexCollections: List of vertex collection names. + :type vertexCollections: Sequence[str] | None + :param edgeCollections: List of edge collection names. + :type edgeCollections: Sequence[str] | None :return: Pregel job ID. :rtype: int :raise arango.exceptions.PregelJobCreateError: If create fails. """ data: Json = {"algorithm": algorithm, "graphName": graph} + if vertexCollections is not None: + data["vertexCollections"] = vertexCollections + if edgeCollections is not None: + data["edgeCollections"] = edgeCollections + if algorithm_params is None: algorithm_params = {} @@ -122,3 +133,20 @@ def response_handler(resp: Response) -> bool: raise PregelJobDeleteError(resp, request) return self._execute(request, response_handler) + + def jobs(self) -> Result[Json]: + """Returns a list of currently running and recently + finished Pregel jobs without retrieving their results. + + :return: Details of each running or recently finished Pregel job. + :rtype: dict + :raise arango.exceptions.PregelJobGetError: If retrieval fails. + """ + request = Request(method="get", endpoint="/_api/control_pregel") + + def response_handler(resp: Response) -> Json: + if resp.is_success: + return format_pregel_job_list(resp.body) + raise PregelJobGetError(resp, request) + + return self._execute(request, response_handler)