@@ -22,18 +22,28 @@ import (
22
22
"crypto/sha1"
23
23
"encoding/base64"
24
24
25
+ "fmt"
26
+ "strconv"
27
+ "strings"
28
+
29
+ "github.com/go-logr/logr"
25
30
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
26
31
27
32
corev1 "k8s.io/api/core/v1"
33
+ networkingv1 "k8s.io/api/networking/v1"
28
34
rbacv1 "k8s.io/api/rbac/v1"
29
35
"k8s.io/apimachinery/pkg/api/errors"
30
36
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31
37
"k8s.io/apimachinery/pkg/runtime"
38
+ "k8s.io/apimachinery/pkg/types"
32
39
"k8s.io/apimachinery/pkg/util/intstr"
33
40
coreapply "k8s.io/client-go/applyconfigurations/core/v1"
34
41
v1 "k8s.io/client-go/applyconfigurations/meta/v1"
42
+ networkingv1ac "k8s.io/client-go/applyconfigurations/networking/v1"
35
43
rbacapply "k8s.io/client-go/applyconfigurations/rbac/v1"
44
+ "k8s.io/client-go/discovery"
36
45
"k8s.io/client-go/kubernetes"
46
+ "k8s.io/client-go/rest"
37
47
ctrl "sigs.k8s.io/controller-runtime"
38
48
"sigs.k8s.io/controller-runtime/pkg/client"
39
49
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
@@ -53,12 +63,14 @@ type RayClusterReconciler struct {
53
63
}
54
64
55
65
const (
56
- requeueTime = 10
57
- controllerName = "codeflare-raycluster-controller"
58
- oAuthFinalizer = "ray.openshift.ai/oauth-finalizer"
59
- oAuthServicePort = 443
60
- oAuthServicePortName = "oauth-proxy"
61
- logRequeueing = "requeueing"
66
+ requeueTime = 10
67
+ controllerName = "codeflare-raycluster-controller"
68
+ oAuthFinalizer = "ray.openshift.ai/oauth-finalizer"
69
+ oAuthServicePort = 443
70
+ oAuthServicePortName = "oauth-proxy"
71
+ oauthAnnotation = "codeflare.dev/oauth"
72
+ RegularServicePortName = "dashboard"
73
+ logRequeueing = "requeueing"
62
74
)
63
75
64
76
var (
@@ -97,6 +109,10 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
97
109
return ctrl.Result {}, client .IgnoreNotFound (err )
98
110
}
99
111
112
+ isLocalInteractive := annotationBoolVal (logger , & cluster , "sdk.codeflare.dev/local_interactive" )
113
+ isOpenShift , ingressHost := getClusterType (logger , r .kubeClient , & cluster )
114
+ ingressDomain := cluster .ObjectMeta .Annotations ["sdk.codeflare.dev/ingress_domain" ]
115
+
100
116
if cluster .ObjectMeta .DeletionTimestamp .IsZero () {
101
117
if ! controllerutil .ContainsFinalizer (& cluster , oAuthFinalizer ) {
102
118
logger .Info ("Add a finalizer" , "finalizer" , oAuthFinalizer )
@@ -130,34 +146,81 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
130
146
return ctrl.Result {}, nil
131
147
}
132
148
133
- _ , err := r .routeClient .Routes (cluster .Namespace ).Apply (ctx , desiredClusterRoute (& cluster ), metav1.ApplyOptions {FieldManager : controllerName , Force : true })
134
- if err != nil {
135
- logger .Error (err , "Failed to update OAuth Route" )
136
- }
149
+ if cluster .Status .State != "suspended" && annotationBoolVal (logger , & cluster , oauthAnnotation ) && isOpenShift {
150
+ logger .Info ("Creating OAuth Objects" )
151
+ _ , err := r .routeClient .Routes (cluster .Namespace ).Apply (ctx , desiredClusterRoute (& cluster ), metav1.ApplyOptions {FieldManager : controllerName , Force : true })
152
+ if err != nil {
153
+ logger .Error (err , "Failed to update OAuth Route" )
154
+ }
137
155
138
- _ , err = r .kubeClient .CoreV1 ().Secrets (cluster .Namespace ).Apply (ctx , desiredOAuthSecret (& cluster , r ), metav1.ApplyOptions {FieldManager : controllerName , Force : true })
139
- if err != nil {
140
- logger .Error (err , "Failed to create OAuth Secret" )
141
- }
156
+ _ , err = r .kubeClient .CoreV1 ().Secrets (cluster .Namespace ).Apply (ctx , desiredOAuthSecret (& cluster , r ), metav1.ApplyOptions {FieldManager : controllerName , Force : true })
157
+ if err != nil {
158
+ logger .Error (err , "Failed to create OAuth Secret" )
159
+ }
142
160
143
- _ , err = r .kubeClient .CoreV1 ().Services (cluster .Namespace ).Apply (ctx , desiredOAuthService (& cluster ), metav1.ApplyOptions {FieldManager : controllerName , Force : true })
144
- if err != nil {
145
- logger .Error (err , "Failed to update OAuth Service" )
146
- }
161
+ _ , err = r .kubeClient .CoreV1 ().Services (cluster .Namespace ).Apply (ctx , desiredOAuthService (& cluster ), metav1.ApplyOptions {FieldManager : controllerName , Force : true })
162
+ if err != nil {
163
+ logger .Error (err , "Failed to update OAuth Service" )
164
+ }
147
165
148
- _ , err = r .kubeClient .CoreV1 ().ServiceAccounts (cluster .Namespace ).Apply (ctx , desiredServiceAccount (& cluster ), metav1.ApplyOptions {FieldManager : controllerName , Force : true })
149
- if err != nil {
150
- logger .Error (err , "Failed to update OAuth ServiceAccount" )
151
- }
166
+ _ , err = r .kubeClient .CoreV1 ().ServiceAccounts (cluster .Namespace ).Apply (ctx , desiredServiceAccount (& cluster ), metav1.ApplyOptions {FieldManager : controllerName , Force : true })
167
+ if err != nil {
168
+ logger .Error (err , "Failed to update OAuth ServiceAccount" )
169
+ }
152
170
153
- _ , err = r .kubeClient .RbacV1 ().ClusterRoleBindings ().Apply (ctx , desiredOAuthClusterRoleBinding (& cluster ), metav1.ApplyOptions {FieldManager : controllerName , Force : true })
154
- if err != nil {
155
- logger .Error (err , "Failed to update OAuth ClusterRoleBinding" )
171
+ _ , err = r .kubeClient .RbacV1 ().ClusterRoleBindings ().Apply (ctx , desiredOAuthClusterRoleBinding (& cluster ), metav1.ApplyOptions {FieldManager : controllerName , Force : true })
172
+ if err != nil {
173
+ logger .Error (err , "Failed to update OAuth ClusterRoleBinding" )
174
+ }
175
+
176
+ } else if cluster .Status .State != "suspended" && ! annotationBoolVal (logger , & cluster , oauthAnnotation ) && isOpenShift {
177
+ logger .Info ("Creating Dashboard Route" )
178
+ _ , err := r .routeClient .Routes (cluster .Namespace ).Apply (ctx , createRoute (& cluster ), metav1.ApplyOptions {FieldManager : controllerName , Force : true })
179
+ if err != nil {
180
+ logger .Error (err , "Failed to update Dashboard Route" )
181
+ }
182
+ if isLocalInteractive && ingressDomain != "" {
183
+ logger .Info ("Creating RayClient Route" )
184
+ _ , err := r .routeClient .Routes (cluster .Namespace ).Apply (ctx , createRayClientRoute (& cluster ), metav1.ApplyOptions {FieldManager : controllerName , Force : true })
185
+ if err != nil {
186
+ logger .Error (err , "Failed to update RayClient Route" )
187
+ }
188
+ }
189
+ return ctrl.Result {}, nil
190
+
191
+ } else if cluster .Status .State != "suspended" && ! annotationBoolVal (logger , & cluster , oauthAnnotation ) && ! isOpenShift {
192
+ logger .Info ("Creating Dashboard Ingress" )
193
+ _ , err := r .kubeClient .NetworkingV1 ().Ingresses (cluster .Namespace ).Apply (ctx , createIngressApplyConfiguration (& cluster , ingressHost ), metav1.ApplyOptions {FieldManager : controllerName , Force : true })
194
+ if err != nil {
195
+ // This log is info level since errors are not fatal and are expected
196
+ logger .Info ("WARN: Failed to update Dashboard Ingress" , "error" , err .Error (), logRequeueing , true )
197
+ }
198
+ if isLocalInteractive && ingressDomain != "" {
199
+ logger .Info ("Creating RayClient Ingress" )
200
+ _ , err := r .kubeClient .NetworkingV1 ().Ingresses (cluster .Namespace ).Apply (ctx , createRayClientIngress (& cluster ), metav1.ApplyOptions {FieldManager : controllerName , Force : true })
201
+ if err != nil {
202
+ logger .Error (err , "Failed to update RayClient Ingress" )
203
+ }
204
+ }
205
+ return ctrl.Result {}, nil
156
206
}
157
207
158
208
return ctrl.Result {}, nil
159
209
}
160
210
211
+ func annotationBoolVal (logger logr.Logger , cluster * rayv1.RayCluster , annotation string ) bool {
212
+ val := cluster .ObjectMeta .Annotations [annotation ]
213
+ boolVal , err := strconv .ParseBool (val )
214
+ if err != nil {
215
+ logger .Error (err , "Could not convert" , annotation , "value to bool" , val )
216
+ }
217
+ if boolVal {
218
+ return true
219
+ } else {
220
+ return false
221
+ }
222
+ }
223
+
161
224
func crbNameFromCluster (cluster * rayv1.RayCluster ) string {
162
225
return cluster .Name + "-" + cluster .Namespace + "-auth" // NOTE: potential naming conflicts ie {name: foo, ns: bar-baz} and {name: foo-bar, ns: baz}
163
226
}
@@ -193,19 +256,23 @@ func desiredServiceAccount(cluster *rayv1.RayCluster) *coreapply.ServiceAccountA
193
256
WithAnnotations (map [string ]string {
194
257
"serviceaccounts.openshift.io/oauth-redirectreference.first" : "" +
195
258
`{"kind":"OAuthRedirectReference","apiVersion":"v1",` +
196
- `"reference":{"kind":"Route","name":"` + routeNameFromCluster (cluster ) + `"}}` ,
259
+ `"reference":{"kind":"Route","name":"` + dashboardNameFromCluster (cluster ) + `"}}` ,
197
260
}).
198
261
WithOwnerReferences (
199
262
v1 .OwnerReference ().WithUID (cluster .UID ).WithName (cluster .Name ).WithKind (cluster .Kind ).WithAPIVersion (cluster .APIVersion ),
200
263
)
201
264
}
202
265
203
- func routeNameFromCluster (cluster * rayv1.RayCluster ) string {
266
+ func dashboardNameFromCluster (cluster * rayv1.RayCluster ) string {
204
267
return "ray-dashboard-" + cluster .Name
205
268
}
206
269
270
+ func rayClientNameFromCluster (cluster * rayv1.RayCluster ) string {
271
+ return "rayclient-" + cluster .Name
272
+ }
273
+
207
274
func desiredClusterRoute (cluster * rayv1.RayCluster ) * routeapply.RouteApplyConfiguration {
208
- return routeapply .Route (routeNameFromCluster (cluster ), cluster .Namespace ).
275
+ return routeapply .Route (dashboardNameFromCluster (cluster ), cluster .Namespace ).
209
276
WithLabels (map [string ]string {"ray.io/cluster-name" : cluster .Name }).
210
277
WithSpec (routeapply .RouteSpec ().
211
278
WithTo (routeapply .RouteTargetReference ().WithKind ("Service" ).WithName (oauthServiceNameFromCluster (cluster ))).
@@ -283,3 +350,165 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
283
350
For (& rayv1.RayCluster {}).
284
351
Complete (r )
285
352
}
353
+
354
+ func serviceNameFromCluster (cluster * rayv1.RayCluster ) string {
355
+ return cluster .Name + "-head-svc"
356
+ }
357
+
358
+ func createRoute (cluster * rayv1.RayCluster ) * routeapply.RouteApplyConfiguration {
359
+ return routeapply .Route (dashboardNameFromCluster (cluster ), cluster .Namespace ).
360
+ WithLabels (map [string ]string {"ray.io/cluster-name" : cluster .Name }).
361
+ WithSpec (routeapply .RouteSpec ().
362
+ WithTo (routeapply .RouteTargetReference ().WithKind ("Service" ).WithName (serviceNameFromCluster (cluster ))).
363
+ WithPort (routeapply .RoutePort ().WithTargetPort (intstr .FromString (RegularServicePortName ))).
364
+ WithTLS (routeapply .TLSConfig ().
365
+ WithTermination ("edge" )),
366
+ ).
367
+ WithOwnerReferences (
368
+ v1 .OwnerReference ().WithUID (cluster .UID ).WithName (cluster .Name ).WithKind (cluster .Kind ).WithAPIVersion (cluster .APIVersion ),
369
+ )
370
+ }
371
+
372
+ func createRayClientRoute (cluster * rayv1.RayCluster ) * routeapply.RouteApplyConfiguration {
373
+ ingress_domain := cluster .ObjectMeta .Annotations ["sdk.codeflare.dev/ingress_domain" ]
374
+ return routeapply .Route (rayClientNameFromCluster (cluster ), cluster .Namespace ).
375
+ WithLabels (map [string ]string {"ray.io/cluster-name" : cluster .Name }).
376
+ WithSpec (routeapply .RouteSpec ().
377
+ WithHost (rayClientNameFromCluster (cluster ) + "-" + cluster .Namespace + "." + ingress_domain ).
378
+ WithTo (routeapply .RouteTargetReference ().WithKind ("Service" ).WithName (serviceNameFromCluster (cluster )).WithWeight (100 )).
379
+ WithPort (routeapply .RoutePort ().WithTargetPort (intstr .FromString ("client" ))).
380
+ WithTLS (routeapply .TLSConfig ().WithTermination ("passthrough" )),
381
+ ).
382
+ WithOwnerReferences (
383
+ v1 .OwnerReference ().WithUID (cluster .UID ).WithName (cluster .Name ).WithKind (cluster .Kind ).WithAPIVersion (cluster .APIVersion ),
384
+ )
385
+ }
386
+
387
+ // Create an Ingress object for the RayCluster
388
+ func createRayClientIngress (cluster * rayv1.RayCluster ) * networkingv1ac.IngressApplyConfiguration {
389
+ ingress_domain := cluster .ObjectMeta .Annotations ["sdk.codeflare.dev/ingress_domain" ]
390
+ return networkingv1ac .Ingress (rayClientNameFromCluster (cluster ), cluster .Namespace ).
391
+ WithLabels (map [string ]string {"ray.io/cluster-name" : cluster .Name }).
392
+ WithAnnotations (map [string ]string {
393
+ "nginx.ingress.kubernetes.io/rewrite-target" : "/" ,
394
+ "nginx.ingress.kubernetes.io/ssl-redirect" : "true" ,
395
+ "nginx.ingress.kubernetes.io/ssl-passthrough" : "true" ,
396
+ }).
397
+ WithOwnerReferences (v1 .OwnerReference ().
398
+ WithAPIVersion (cluster .APIVersion ).
399
+ WithKind (cluster .Kind ).
400
+ WithName (cluster .Name ).
401
+ WithUID (types .UID (cluster .UID ))).
402
+ WithSpec (networkingv1ac .IngressSpec ().
403
+ WithIngressClassName ("nginx" ).
404
+ WithRules (networkingv1ac .IngressRule ().
405
+ WithHost (rayClientNameFromCluster (cluster ) + "-" + cluster .Namespace + "." + ingress_domain ).
406
+ WithHTTP (networkingv1ac .HTTPIngressRuleValue ().
407
+ WithPaths (networkingv1ac .HTTPIngressPath ().
408
+ WithPath ("/" ).
409
+ WithPathType (networkingv1 .PathTypeImplementationSpecific ).
410
+ WithBackend (networkingv1ac .IngressBackend ().
411
+ WithService (networkingv1ac .IngressServiceBackend ().
412
+ WithName (serviceNameFromCluster (cluster )).
413
+ WithPort (networkingv1ac .ServiceBackendPort ().
414
+ WithNumber (10001 ),
415
+ ),
416
+ ),
417
+ ),
418
+ ),
419
+ ),
420
+ ),
421
+ )
422
+ // Optionally, add TLS configuration here if needed
423
+ }
424
+
425
+ // Create an Ingress object for the RayCluster
426
+ func createIngressApplyConfiguration (cluster * rayv1.RayCluster , ingressHost string ) * networkingv1ac.IngressApplyConfiguration {
427
+ return networkingv1ac .Ingress (dashboardNameFromCluster (cluster ), cluster .Namespace ).
428
+ WithLabels (map [string ]string {"ray.io/cluster-name" : cluster .Name }).
429
+ WithOwnerReferences (v1 .OwnerReference ().
430
+ WithAPIVersion (cluster .APIVersion ).
431
+ WithKind (cluster .Kind ).
432
+ WithName (cluster .Name ).
433
+ WithUID (types .UID (cluster .UID ))).
434
+ WithSpec (networkingv1ac .IngressSpec ().
435
+ WithRules (networkingv1ac .IngressRule ().
436
+ WithHost (ingressHost ). // kind host name or ingress_domain
437
+ WithHTTP (networkingv1ac .HTTPIngressRuleValue ().
438
+ WithPaths (networkingv1ac .HTTPIngressPath ().
439
+ WithPath ("/" ).
440
+ WithPathType (networkingv1 .PathTypePrefix ).
441
+ WithBackend (networkingv1ac .IngressBackend ().
442
+ WithService (networkingv1ac .IngressServiceBackend ().
443
+ WithName (serviceNameFromCluster (cluster )).
444
+ WithPort (networkingv1ac .ServiceBackendPort ().
445
+ WithName (RegularServicePortName ),
446
+ ),
447
+ ),
448
+ ),
449
+ ),
450
+ ),
451
+ ),
452
+ )
453
+ // Optionally, add TLS configuration here if needed
454
+ }
455
+
456
+ // isOnKindCluster checks if the current cluster is a KinD cluster.
457
+ // It searches for a node with a label commonly used by KinD clusters.
458
+ func isOnKindCluster (clientset * kubernetes.Clientset ) (bool , error ) {
459
+ nodes , err := clientset .CoreV1 ().Nodes ().List (context .TODO (), metav1.ListOptions {
460
+ LabelSelector : "kubernetes.io/hostname=kind-control-plane" ,
461
+ })
462
+ if err != nil {
463
+ return false , err
464
+ }
465
+ // If we find one or more nodes with the label, assume it's a KinD cluster.
466
+ return len (nodes .Items ) > 0 , nil
467
+ }
468
+
469
+ // getDiscoveryClient returns a discovery client for the current reconciler
470
+ func getDiscoveryClient (config * rest.Config ) (* discovery.DiscoveryClient , error ) {
471
+ return discovery .NewDiscoveryClientForConfig (config )
472
+ }
473
+
474
+ // Check where we are running. We are trying to distinguish here whether
475
+ // this is vanilla kubernetes cluster or Openshift
476
+ func getClusterType (logger logr.Logger , clientset * kubernetes.Clientset , cluster * rayv1.RayCluster ) (bool , string ) {
477
+ // The discovery package is used to discover APIs supported by a Kubernetes API server.
478
+ ingress_domain := cluster .ObjectMeta .Annotations ["sdk.codeflare.dev/ingress_domain" ]
479
+ config , err := ctrl .GetConfig ()
480
+ if err == nil && config != nil {
481
+ dclient , err := getDiscoveryClient (config )
482
+ if err == nil && dclient != nil {
483
+ apiGroupList , err := dclient .ServerGroups ()
484
+ if err != nil {
485
+ logger .Info ("Error while querying ServerGroups, assuming we're on Vanilla Kubernetes" )
486
+ return false , ""
487
+ } else {
488
+ for i := 0 ; i < len (apiGroupList .Groups ); i ++ {
489
+ if strings .HasSuffix (apiGroupList .Groups [i ].Name , ".openshift.io" ) {
490
+ logger .Info ("We detected being on OpenShift!" )
491
+ return true , ""
492
+ }
493
+ }
494
+ onKind , _ := isOnKindCluster (clientset )
495
+ if onKind && ingress_domain == "" {
496
+ logger .Info ("We detected being on a KinD cluster!" )
497
+ return false , "kind"
498
+ } else {
499
+ logger .Info ("We detected being on Vanilla Kubernetes!" )
500
+ return false , fmt .Sprintf ("ray-dashboard-%s-%s.%s" , cluster .Name , cluster .Namespace , ingress_domain )
501
+ }
502
+ }
503
+ } else {
504
+ logger .Info ("Cannot retrieve a DiscoveryClient, assuming we're on Vanilla Kubernetes" )
505
+ return false , fmt .Sprintf ("ray-dashboard-%s-%s.%s" , cluster .Name , cluster .Namespace , ingress_domain )
506
+ }
507
+ } else {
508
+ logger .Info ("Cannot retrieve config, assuming we're on Vanilla Kubernetes" )
509
+ return false , fmt .Sprintf ("ray-dashboard-%s-%s.%s" , cluster .Name , cluster .Namespace , ingress_domain )
510
+ }
511
+ }
512
+
513
+ // No more ingress_options - Removing completely.
514
+ // What to do about ingress_domain? Needed for local_interactive?
0 commit comments