21
21
from time import sleep
22
22
from typing import List , Optional , Tuple , Dict
23
23
24
+ import openshift as oc
25
+ from kubernetes import config
24
26
from ray .job_submission import JobSubmissionClient
27
+ import urllib3
25
28
26
29
from .auth import config_check , api_config_handler
27
30
from ..utils import pretty_print
28
31
from ..utils .generate_yaml import generate_appwrapper
29
32
from ..utils .kube_api_helpers import _kube_api_error_handling
33
+ from ..utils .openshift_oauth import (
34
+ create_openshift_oauth_objects ,
35
+ delete_openshift_oauth_objects ,
36
+ download_tls_cert ,
37
+ )
30
38
from .config import ClusterConfiguration
31
39
from .model import (
32
40
AppWrapper ,
40
48
import os
41
49
import requests
42
50
51
+ from kubernetes import config
52
+
43
53
44
54
class Cluster :
45
55
"""
@@ -61,6 +71,38 @@ def __init__(self, config: ClusterConfiguration):
61
71
self .config = config
62
72
self .app_wrapper_yaml = self .create_app_wrapper ()
63
73
self .app_wrapper_name = self .app_wrapper_yaml .split ("." )[0 ]
74
+ self ._client = None
75
+
76
+ @property
77
+ def _client_headers (self ):
78
+ return {
79
+ "Authorization" : api_config_handler ().configuration .get_api_key_with_prefix (
80
+ "authorization"
81
+ )
82
+ }
83
+
84
+ @property
85
+ def _client_verify_tls (self ):
86
+ return not self .config .openshift_oauth
87
+
88
+ @property
89
+ def client (self ):
90
+ if self ._client :
91
+ return self ._client
92
+ if self .config .openshift_oauth :
93
+ print (
94
+ api_config_handler ().configuration .get_api_key_with_prefix (
95
+ "authorization"
96
+ )
97
+ )
98
+ self ._client = JobSubmissionClient (
99
+ self .cluster_dashboard_uri (),
100
+ headers = self ._client_headers ,
101
+ verify = self ._client_verify_tls ,
102
+ )
103
+ else :
104
+ self ._client = JobSubmissionClient (self .cluster_dashboard_uri ())
105
+ return self ._client
64
106
65
107
def evaluate_dispatch_priority (self ):
66
108
priority_class = self .config .dispatch_priority
@@ -141,6 +183,7 @@ def create_app_wrapper(self):
141
183
image_pull_secrets = image_pull_secrets ,
142
184
dispatch_priority = dispatch_priority ,
143
185
priority_val = priority_val ,
186
+ openshift_oauth = self .config .openshift_oauth ,
144
187
)
145
188
146
189
# creates a new cluster with the provided or default spec
@@ -150,6 +193,11 @@ def up(self):
150
193
the MCAD queue.
151
194
"""
152
195
namespace = self .config .namespace
196
+ if self .config .openshift_oauth :
197
+ create_openshift_oauth_objects (
198
+ cluster_name = self .config .name , namespace = namespace
199
+ )
200
+
153
201
try :
154
202
config_check ()
155
203
api_instance = client .CustomObjectsApi (api_config_handler ())
@@ -184,6 +232,11 @@ def down(self):
184
232
except Exception as e : # pragma: no cover
185
233
return _kube_api_error_handling (e )
186
234
235
+ if self .config .openshift_oauth :
236
+ delete_openshift_oauth_objects (
237
+ cluster_name = self .config .name , namespace = namespace
238
+ )
239
+
187
240
def status (
188
241
self , print_to_console : bool = True
189
242
) -> Tuple [CodeFlareClusterStatus , bool ]:
@@ -252,7 +305,16 @@ def status(
252
305
return status , ready
253
306
254
307
def is_dashboard_ready (self ) -> bool :
255
- response = requests .get (self .cluster_dashboard_uri (), timeout = 5 )
308
+ try :
309
+ response = requests .get (
310
+ self .cluster_dashboard_uri (),
311
+ headers = self ._client_headers ,
312
+ timeout = 5 ,
313
+ verify = self ._client_verify_tls ,
314
+ )
315
+ except requests .exceptions .SSLError :
316
+ # SSL exception occurs when oauth ingress has been created but cluster is not up
317
+ return False
256
318
if response .status_code == 200 :
257
319
return True
258
320
else :
@@ -311,7 +373,13 @@ def cluster_dashboard_uri(self) -> str:
311
373
return _kube_api_error_handling (e )
312
374
313
375
for route in routes ["items" ]:
314
- if route ["metadata" ]["name" ] == f"ray-dashboard-{ self .config .name } " :
376
+ if route ["metadata" ][
377
+ "name"
378
+ ] == f"ray-dashboard-{ self .config .name } " or route ["metadata" ][
379
+ "name"
380
+ ].startswith (
381
+ f"{ self .config .name } -ingress"
382
+ ):
315
383
protocol = "https" if route ["spec" ].get ("tls" ) else "http"
316
384
return f"{ protocol } ://{ route ['spec' ]['host' ]} "
317
385
return "Dashboard route not available yet, have you run cluster.up()?"
@@ -320,30 +388,24 @@ def list_jobs(self) -> List:
320
388
"""
321
389
This method accesses the head ray node in your cluster and lists the running jobs.
322
390
"""
323
- dashboard_route = self .cluster_dashboard_uri ()
324
- client = JobSubmissionClient (dashboard_route )
325
- return client .list_jobs ()
391
+ return self .client .list_jobs ()
326
392
327
393
def job_status (self , job_id : str ) -> str :
328
394
"""
329
395
This method accesses the head ray node in your cluster and returns the job status for the provided job id.
330
396
"""
331
- dashboard_route = self .cluster_dashboard_uri ()
332
- client = JobSubmissionClient (dashboard_route )
333
- return client .get_job_status (job_id )
397
+ return self .client .get_job_status (job_id )
334
398
335
399
def job_logs (self , job_id : str ) -> str :
336
400
"""
337
401
This method accesses the head ray node in your cluster and returns the logs for the provided job id.
338
402
"""
339
- dashboard_route = self .cluster_dashboard_uri ()
340
- client = JobSubmissionClient (dashboard_route )
341
- return client .get_job_logs (job_id )
403
+ return self .client .get_job_logs (job_id )
342
404
343
405
def torchx_config (
344
406
self , working_dir : str = None , requirements : str = None
345
407
) -> Dict [str , str ]:
346
- dashboard_address = f" { self .cluster_dashboard_uri (). lstrip ( 'http://' ) } "
408
+ dashboard_address = urllib3 . util . parse_url ( self .cluster_dashboard_uri ()). host
347
409
to_return = {
348
410
"cluster_name" : self .config .name ,
349
411
"dashboard_address" : dashboard_address ,
@@ -455,8 +517,8 @@ def get_current_namespace(): # pragma: no cover
455
517
456
518
def get_cluster (cluster_name : str , namespace : str = "default" ):
457
519
try :
458
- config_check ()
459
- api_instance = client .CustomObjectsApi (api_config_handler () )
520
+ config . load_kube_config ()
521
+ api_instance = client .CustomObjectsApi ()
460
522
rcs = api_instance .list_namespaced_custom_object (
461
523
group = "ray.io" ,
462
524
version = "v1alpha1" ,
@@ -477,7 +539,7 @@ def get_cluster(cluster_name: str, namespace: str = "default"):
477
539
# private methods
478
540
def _get_ingress_domain ():
479
541
try :
480
- config_check ()
542
+ config . load_kube_config ()
481
543
api_client = client .CustomObjectsApi (api_config_handler ())
482
544
ingress = api_client .get_cluster_custom_object (
483
545
"config.openshift.io" , "v1" , "ingresses" , "cluster"
@@ -572,7 +634,7 @@ def _get_app_wrappers(
572
634
573
635
574
636
def _map_to_ray_cluster (rc ) -> Optional [RayCluster ]:
575
- if "status" in rc and " state" in rc ["status" ]:
637
+ if "state" in rc ["status" ]:
576
638
status = RayClusterStatus (rc ["status" ]["state" ].lower ())
577
639
else :
578
640
status = RayClusterStatus .UNKNOWN
@@ -587,7 +649,13 @@ def _map_to_ray_cluster(rc) -> Optional[RayCluster]:
587
649
)
588
650
ray_route = None
589
651
for route in routes ["items" ]:
590
- if route ["metadata" ]["name" ] == f"ray-dashboard-{ rc ['metadata' ]['name' ]} " :
652
+ if route ["metadata" ][
653
+ "name"
654
+ ] == f"ray-dashboard-{ rc ['metadata' ]['name' ]} " or route ["metadata" ][
655
+ "name"
656
+ ].startswith (
657
+ f"{ rc ['metadata' ]['name' ]} -ingress"
658
+ ):
591
659
protocol = "https" if route ["spec" ].get ("tls" ) else "http"
592
660
ray_route = f"{ protocol } ://{ route ['spec' ]['host' ]} "
593
661
0 commit comments