Skip to content

Commit ef48710

Browse files
authored
Merge pull request #220 from ArangoDB-Community/DE-361-pregel-api-updates
3:10 Updated Pregel API Features
2 parents fd38882 + 52683da commit ef48710

File tree

2 files changed

+78
-15
lines changed

2 files changed

+78
-15
lines changed

arango/formatter.py

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Any
1+
from typing import Any, Sequence
22

33
from arango.typings import Headers, Json
44

@@ -1072,26 +1072,61 @@ def format_pregel_job_data(body: Json) -> Json:
10721072
"""
10731073
result: Json = {}
10741074

1075-
if "aggregators" in body:
1076-
result["aggregators"] = body["aggregators"]
1075+
if "id" in body:
1076+
result["id"] = body["id"]
1077+
if "algorithm" in body:
1078+
result["algorithm"] = body["algorithm"]
1079+
if "created" in body:
1080+
result["created"] = body["created"]
1081+
if "expires" in body:
1082+
result["expires"] = body["expires"]
1083+
if "ttl" in body:
1084+
result["ttl"] = body["ttl"]
1085+
if "algorithm" in body:
1086+
result["algorithm"] = body["algorithm"]
1087+
if "state" in body:
1088+
result["state"] = body["state"]
1089+
if "gss" in body:
1090+
result["gss"] = body["gss"]
1091+
if "totalRuntime" in body:
1092+
result["total_runtime"] = body["totalRuntime"]
1093+
if "startupTime" in body:
1094+
result["startup_time"] = body["startupTime"]
10771095
if "computationTime" in body:
10781096
result["computation_time"] = body["computationTime"]
1097+
if "storageTime" in body:
1098+
result["storageTime"] = body["storageTime"]
1099+
if "gssTimes" in body:
1100+
result["gssTimes"] = body["gssTimes"]
1101+
if "reports" in body:
1102+
result["reports"] = body["reports"]
1103+
if "vertexCount" in body:
1104+
result["vertex_count"] = body["vertexCount"]
10791105
if "edgeCount" in body:
10801106
result["edge_count"] = body["edgeCount"]
1081-
if "gss" in body:
1082-
result["gss"] = body["gss"]
1107+
if "aggregators" in body:
1108+
result["aggregators"] = body["aggregators"]
10831109
if "receivedCount" in body:
10841110
result["received_count"] = body["receivedCount"]
10851111
if "sendCount" in body:
10861112
result["send_count"] = body["sendCount"]
1087-
if "startupTime" in body:
1088-
result["startup_time"] = body["startupTime"]
1089-
if "state" in body:
1090-
result["state"] = body["state"]
1091-
if "totalRuntime" in body:
1092-
result["total_runtime"] = body["totalRuntime"]
1093-
if "vertexCount" in body:
1094-
result["vertex_count"] = body["vertexCount"]
1113+
1114+
# The detail element was introduced in 3.10
1115+
if "detail" in body:
1116+
result["detail"] = body["detail"]
1117+
1118+
return verify_format(body, result)
1119+
1120+
1121+
def format_pregel_job_list(body: Sequence[Json]) -> Json:
1122+
"""Format Pregel job list data.
1123+
1124+
:param body: Input body.
1125+
:type body: dict
1126+
:return: Formatted body.
1127+
:rtype: dict
1128+
"""
1129+
result: Json = {"jobs": [format_pregel_job_data(j) for j in body]}
10951130

10961131
return verify_format(body, result)
10971132

arango/pregel.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
__all__ = ["Pregel"]
22

3-
from typing import Optional
3+
from typing import Optional, Sequence
44

55
from arango.api import ApiGroup
66
from arango.exceptions import (
77
PregelJobCreateError,
88
PregelJobDeleteError,
99
PregelJobGetError,
1010
)
11-
from arango.formatter import format_pregel_job_data
11+
from arango.formatter import format_pregel_job_data, format_pregel_job_list
1212
from arango.request import Request
1313
from arango.response import Response
1414
from arango.result import Result
@@ -49,6 +49,8 @@ def create_job(
4949
async_mode: Optional[bool] = None,
5050
result_field: Optional[str] = None,
5151
algorithm_params: Optional[Json] = None,
52+
vertexCollections: Optional[Sequence[str]] = None,
53+
edgeCollections: Optional[Sequence[str]] = None,
5254
) -> Result[int]:
5355
"""Start a new Pregel job.
5456
@@ -74,12 +76,21 @@ def create_job(
7476
:type result_field: str | None
7577
:param algorithm_params: Additional algorithm parameters.
7678
:type algorithm_params: dict | None
79+
:param vertexCollections: List of vertex collection names.
80+
:type vertexCollections: Sequence[str] | None
81+
:param edgeCollections: List of edge collection names.
82+
:type edgeCollections: Sequence[str] | None
7783
:return: Pregel job ID.
7884
:rtype: int
7985
:raise arango.exceptions.PregelJobCreateError: If create fails.
8086
"""
8187
data: Json = {"algorithm": algorithm, "graphName": graph}
8288

89+
if vertexCollections is not None:
90+
data["vertexCollections"] = vertexCollections
91+
if edgeCollections is not None:
92+
data["edgeCollections"] = edgeCollections
93+
8394
if algorithm_params is None:
8495
algorithm_params = {}
8596

@@ -122,3 +133,20 @@ def response_handler(resp: Response) -> bool:
122133
raise PregelJobDeleteError(resp, request)
123134

124135
return self._execute(request, response_handler)
136+
137+
def jobs(self) -> Result[Json]:
138+
"""Returns a list of currently running and recently
139+
finished Pregel jobs without retrieving their results.
140+
141+
:return: Details of each running or recently finished Pregel job.
142+
:rtype: dict
143+
:raise arango.exceptions.PregelJobGetError: If retrieval fails.
144+
"""
145+
request = Request(method="get", endpoint="/_api/control_pregel")
146+
147+
def response_handler(resp: Response) -> Json:
148+
if resp.is_success:
149+
return format_pregel_job_list(resp.body)
150+
raise PregelJobGetError(resp, request)
151+
152+
return self._execute(request, response_handler)

0 commit comments

Comments
 (0)