Skip to content

Commit 3bb24d8

Browse files
committed
Implement Wcc endpoints using Cypher procedures
1 parent ce3a6a4 commit 3bb24d8

File tree

5 files changed

+464
-0
lines changed

5 files changed

+464
-0
lines changed

graphdatascience/graph_data_science.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
from neo4j import Driver
88
from pandas import DataFrame
99

10+
from graphdatascience.procedure_surface.api.wcc_endpoints import WccEndpoints
11+
from graphdatascience.procedure_surface.cypher.wcc_proc_runner import WccCypherEndpoints
12+
1013
from .call_builder import IndirectCallBuilder
1114
from .endpoints import AlphaEndpoints, BetaEndpoints, DirectEndpoints
1215
from .error.uncallable_namespace import UncallableNamespace
@@ -106,10 +109,16 @@ def __init__(
106109
self._query_runner.set_show_progress(show_progress)
107110
super().__init__(self._query_runner, namespace="gds", server_version=self._server_version)
108111

112+
self._wcc_endpoints = WccCypherEndpoints(self._query_runner)
113+
109114
@property
110115
def graph(self) -> GraphProcRunner:
111116
return GraphProcRunner(self._query_runner, f"{self._namespace}.graph", self._server_version)
112117

118+
@property
119+
def wcc(self) -> WccEndpoints:
120+
return self._wcc_endpoints
121+
113122
@property
114123
def util(self) -> UtilProcRunner:
115124
return UtilProcRunner(self._query_runner, f"{self._namespace}.util", self._server_version)

graphdatascience/procedure_surface/__init__.py

Whitespace-only changes.

graphdatascience/procedure_surface/cypher/__init__.py

Whitespace-only changes.
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
from typing import Any, List, Optional
2+
3+
from pandas import DataFrame, Series
4+
5+
from ...call_parameters import CallParameters
6+
from ...graph.graph_object import Graph
7+
from ...query_runner.query_runner import QueryRunner
8+
from ..api.wcc_endpoints import WccEndpoints
9+
10+
11+
class WccCypherEndpoints(WccEndpoints):
12+
"""
13+
Implementation of the WCC algorithm endpoints.
14+
This class handles the actual execution by forwarding calls to the query runner.
15+
"""
16+
17+
def __init__(self, query_runner: QueryRunner):
18+
self._query_runner = query_runner
19+
20+
def mutate(
21+
self,
22+
G: Graph,
23+
mutate_property: str,
24+
threshold: Optional[float] = None,
25+
relationship_types: Optional[List[str]] = None,
26+
node_labels: Optional[List[str]] = None,
27+
sudo: Optional[bool] = None,
28+
log_progress: Optional[bool] = None,
29+
username: Optional[str] = None,
30+
concurrency: Optional[int] = None,
31+
job_id: Optional[str] = None,
32+
seed_property: Optional[str] = None,
33+
consecutive_ids: Optional[bool] = None,
34+
relationship_weight_property: Optional[str] = None,
35+
) -> Series[Any]:
36+
# Build configuration dictionary from parameters
37+
config: dict[str, Any] = {
38+
"mutateProperty": mutate_property,
39+
}
40+
41+
# Add optional parameters
42+
if threshold is not None:
43+
config["threshold"] = threshold
44+
if relationship_types is not None:
45+
config["relationshipTypes"] = relationship_types
46+
if node_labels is not None:
47+
config["nodeLabels"] = node_labels
48+
if sudo is not None:
49+
config["sudo"] = sudo
50+
if log_progress is not None:
51+
config["logProgress"] = log_progress
52+
if username is not None:
53+
config["username"] = username
54+
if concurrency is not None:
55+
config["concurrency"] = concurrency
56+
if job_id is not None:
57+
config["jobId"] = job_id
58+
if seed_property is not None:
59+
config["seedProperty"] = seed_property
60+
if consecutive_ids is not None:
61+
config["consecutiveIds"] = consecutive_ids
62+
if relationship_weight_property is not None:
63+
config["relationshipWeightProperty"] = relationship_weight_property
64+
65+
# Run procedure and return results
66+
params = CallParameters(graph_name=G.name(), config=config)
67+
params.ensure_job_id_in_config()
68+
69+
return self._query_runner.call_procedure(endpoint="gds.wcc.mutate", params=params).squeeze() # type: ignore
70+
71+
def stats(
72+
self,
73+
G: Graph,
74+
threshold: Optional[float] = None,
75+
relationship_types: Optional[List[str]] = None,
76+
node_labels: Optional[List[str]] = None,
77+
sudo: Optional[bool] = None,
78+
log_progress: Optional[bool] = None,
79+
username: Optional[str] = None,
80+
concurrency: Optional[int] = None,
81+
job_id: Optional[str] = None,
82+
seed_property: Optional[str] = None,
83+
consecutive_ids: Optional[bool] = None,
84+
relationship_weight_property: Optional[str] = None,
85+
) -> Series[Any]:
86+
# Build configuration dictionary from parameters
87+
config: dict[str, Any] = {}
88+
89+
# Add optional parameters
90+
if threshold is not None:
91+
config["threshold"] = threshold
92+
if relationship_types is not None:
93+
config["relationshipTypes"] = relationship_types
94+
if node_labels is not None:
95+
config["nodeLabels"] = node_labels
96+
if sudo is not None:
97+
config["sudo"] = sudo
98+
if log_progress is not None:
99+
config["logProgress"] = log_progress
100+
if username is not None:
101+
config["username"] = username
102+
if concurrency is not None:
103+
config["concurrency"] = concurrency
104+
if job_id is not None:
105+
config["jobId"] = job_id
106+
if seed_property is not None:
107+
config["seedProperty"] = seed_property
108+
if consecutive_ids is not None:
109+
config["consecutiveIds"] = consecutive_ids
110+
if relationship_weight_property is not None:
111+
config["relationshipWeightProperty"] = relationship_weight_property
112+
113+
# Run procedure and return results
114+
params = CallParameters(graph_name=G.name(), config=config)
115+
params.ensure_job_id_in_config()
116+
117+
return self._query_runner.call_procedure(endpoint="gds.wcc.stats", params=params).squeeze() # type: ignore
118+
119+
def stream(
120+
self,
121+
G: Graph,
122+
min_component_size: Optional[int] = None,
123+
threshold: Optional[float] = None,
124+
relationship_types: Optional[List[str]] = None,
125+
node_labels: Optional[List[str]] = None,
126+
sudo: Optional[bool] = None,
127+
log_progress: Optional[bool] = None,
128+
username: Optional[str] = None,
129+
concurrency: Optional[int] = None,
130+
job_id: Optional[str] = None,
131+
seed_property: Optional[str] = None,
132+
consecutive_ids: Optional[bool] = None,
133+
relationship_weight_property: Optional[str] = None,
134+
) -> DataFrame:
135+
# Build configuration dictionary from parameters
136+
config: dict[str, Any] = {}
137+
138+
# Add optional parameters
139+
if min_component_size is not None:
140+
config["minComponentSize"] = min_component_size
141+
if threshold is not None:
142+
config["threshold"] = threshold
143+
if relationship_types is not None:
144+
config["relationshipTypes"] = relationship_types
145+
if node_labels is not None:
146+
config["nodeLabels"] = node_labels
147+
if sudo is not None:
148+
config["sudo"] = sudo
149+
if log_progress is not None:
150+
config["logProgress"] = log_progress
151+
if username is not None:
152+
config["username"] = username
153+
if concurrency is not None:
154+
config["concurrency"] = concurrency
155+
if job_id is not None:
156+
config["jobId"] = job_id
157+
if seed_property is not None:
158+
config["seedProperty"] = seed_property
159+
if consecutive_ids is not None:
160+
config["consecutiveIds"] = consecutive_ids
161+
if relationship_weight_property is not None:
162+
config["relationshipWeightProperty"] = relationship_weight_property
163+
164+
# Run procedure and return results
165+
params = CallParameters(graph_name=G.name(), config=config)
166+
params.ensure_job_id_in_config()
167+
168+
return self._query_runner.call_procedure(endpoint="gds.wcc.stream", params=params)
169+
170+
def write(
171+
self,
172+
G: Graph,
173+
write_property: str,
174+
min_component_size: Optional[int] = None,
175+
threshold: Optional[float] = None,
176+
relationship_types: Optional[List[str]] = None,
177+
node_labels: Optional[List[str]] = None,
178+
sudo: Optional[bool] = None,
179+
log_progress: Optional[bool] = None,
180+
username: Optional[str] = None,
181+
concurrency: Optional[int] = None,
182+
job_id: Optional[str] = None,
183+
seed_property: Optional[str] = None,
184+
consecutive_ids: Optional[bool] = None,
185+
relationship_weight_property: Optional[str] = None,
186+
write_concurrency: Optional[int] = None,
187+
write_to_result_store: Optional[bool] = None,
188+
) -> Series[Any]:
189+
# Build configuration dictionary from parameters
190+
config: dict[str, Any] = {
191+
"writeProperty": write_property,
192+
}
193+
194+
# Add optional parameters
195+
if min_component_size is not None:
196+
config["minComponentSize"] = min_component_size
197+
if threshold is not None:
198+
config["threshold"] = threshold
199+
if relationship_types is not None:
200+
config["relationshipTypes"] = relationship_types
201+
if node_labels is not None:
202+
config["nodeLabels"] = node_labels
203+
if sudo is not None:
204+
config["sudo"] = sudo
205+
if log_progress is not None:
206+
config["logProgress"] = log_progress
207+
if username is not None:
208+
config["username"] = username
209+
if concurrency is not None:
210+
config["concurrency"] = concurrency
211+
if job_id is not None:
212+
config["jobId"] = job_id
213+
if seed_property is not None:
214+
config["seedProperty"] = seed_property
215+
if consecutive_ids is not None:
216+
config["consecutiveIds"] = consecutive_ids
217+
if relationship_weight_property is not None:
218+
config["relationshipWeightProperty"] = relationship_weight_property
219+
if write_concurrency is not None:
220+
config["writeConcurrency"] = write_concurrency
221+
if write_to_result_store is not None:
222+
config["writeToResultStore"] = write_to_result_store
223+
224+
# Run procedure and return results
225+
params = CallParameters(graph_name=G.name(), config=config)
226+
params.ensure_job_id_in_config()
227+
228+
return self._query_runner.call_procedure(endpoint="gds.wcc.write", params=params).squeeze() # type: ignore

0 commit comments

Comments
 (0)