1
1
from codeflare_sdk import (
2
2
Cluster ,
3
3
ClusterConfiguration ,
4
- TokenAuthentication ,
5
4
generate_cert ,
6
5
)
7
6
8
7
import pytest
9
8
import ray
10
9
import math
10
+ import subprocess
11
11
12
12
from support import *
13
13
14
14
15
15
@pytest .mark .kind
16
- class TestRayLocalInteractiveOauth :
16
+ class TestRayLocalInteractiveKind :
17
17
def setup_method (self ):
18
18
initialize_kubernetes_client (self )
19
+ self .port_forward_process = None
20
+
21
+ def cleanup_port_forward (self ):
22
+ if self .port_forward_process :
23
+ self .port_forward_process .terminate ()
24
+ self .port_forward_process .wait (timeout = 10 )
25
+ self .port_forward_process = None
19
26
20
27
def teardown_method (self ):
28
+ self .cleanup_port_forward ()
21
29
delete_namespace (self )
22
30
delete_kueue_resources (self )
23
31
@@ -39,6 +47,8 @@ def run_local_interactives(
39
47
):
40
48
cluster_name = "test-ray-cluster-li"
41
49
50
+ ray .shutdown ()
51
+
42
52
cluster = Cluster (
43
53
ClusterConfiguration (
44
54
name = cluster_name ,
@@ -49,25 +59,24 @@ def run_local_interactives(
49
59
head_memory_requests = 2 ,
50
60
head_memory_limits = 2 ,
51
61
worker_cpu_requests = "500m" ,
52
- worker_cpu_limits = 1 ,
62
+ worker_cpu_limits = "500m" ,
53
63
worker_memory_requests = 1 ,
54
64
worker_memory_limits = 4 ,
55
65
worker_extended_resource_requests = {gpu_resource_name : number_of_gpus },
56
- write_to_file = True ,
57
66
verify_tls = False ,
58
67
)
59
68
)
69
+
60
70
cluster .up ()
71
+
61
72
cluster .wait_ready ()
73
+ cluster .status ()
62
74
63
75
generate_cert .generate_tls_cert (cluster_name , self .namespace )
64
76
generate_cert .export_env (cluster_name , self .namespace )
65
77
66
78
print (cluster .local_client_url ())
67
79
68
- ray .shutdown ()
69
- ray .init (address = cluster .local_client_url (), logging_level = "DEBUG" )
70
-
71
80
@ray .remote (num_gpus = number_of_gpus / 2 )
72
81
def heavy_calculation_part (num_iterations ):
73
82
result = 0.0
@@ -84,10 +93,34 @@ def heavy_calculation(num_iterations):
84
93
)
85
94
return sum (results )
86
95
87
- ref = heavy_calculation .remote (3000 )
88
- result = ray .get (ref )
89
- assert result == 1789.4644387076714
90
- ray .cancel (ref )
91
- ray .shutdown ()
96
+ # Attempt to port forward
97
+ try :
98
+ local_port = "20001"
99
+ ray_client_port = "10001"
100
+
101
+ port_forward_cmd = [
102
+ "kubectl" ,
103
+ "port-forward" ,
104
+ "-n" ,
105
+ self .namespace ,
106
+ f"svc/{ cluster_name } -head-svc" ,
107
+ f"{ local_port } :{ ray_client_port } " ,
108
+ ]
109
+ self .port_forward_process = subprocess .Popen (
110
+ port_forward_cmd , stdout = subprocess .DEVNULL , stderr = subprocess .DEVNULL
111
+ )
112
+
113
+ client_url = f"ray://localhost:{ local_port } "
114
+ cluster .status ()
115
+
116
+ ray .init (address = client_url , logging_level = "INFO" )
117
+
118
+ ref = heavy_calculation .remote (3000 )
119
+ result = ray .get (ref )
120
+ assert result == 1789.4644387076714
121
+ ray .cancel (ref )
122
+ ray .shutdown ()
92
123
93
- cluster .down ()
124
+ cluster .down ()
125
+ finally :
126
+ self .cleanup_port_forward ()
0 commit comments