diff --git a/.github/workflows/e2e_tests.yaml b/.github/workflows/e2e_tests.yaml index 0fc1ec88b..2e8fdbc1f 100644 --- a/.github/workflows/e2e_tests.yaml +++ b/.github/workflows/e2e_tests.yaml @@ -60,6 +60,7 @@ jobs: run: | echo Deploying CodeFlare operator IMG="${REGISTRY_ADDRESS}"/codeflare-operator + sed -i 's/RayDashboardOAuthEnabled: pointer.Bool(true)/RayDashboardOAuthEnabled: pointer.Bool(false)/' main.go make image-push -e IMG="${IMG}" make deploy -e IMG="${IMG}" -e ENV="e2e" kubectl wait --timeout=120s --for=condition=Available=true deployment -n openshift-operators codeflare-operator-manager diff --git a/main.go b/main.go index 05a53f074..658b55e90 100644 --- a/main.go +++ b/main.go @@ -188,8 +188,8 @@ func main() { } v, err := HasAPIResourceForGVK(kubeClient.DiscoveryClient, rayv1.GroupVersion.WithKind("RayCluster")) - if v && *cfg.KubeRay.RayDashboardOAuthEnabled { - rayClusterController := controllers.RayClusterReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme()} + if v { + rayClusterController := controllers.RayClusterReconciler{Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Config: cfg} exitOnError(rayClusterController.SetupWithManager(mgr), "Error setting up RayCluster controller") } else if err != nil { exitOnError(err, "Could not determine if RayCluster CR present on cluster.") diff --git a/pkg/controllers/raycluster_controller.go b/pkg/controllers/raycluster_controller.go index 9e45bf78d..23104c2ab 100644 --- a/pkg/controllers/raycluster_controller.go +++ b/pkg/controllers/raycluster_controller.go @@ -41,6 +41,8 @@ import ( routev1 "github.com/openshift/api/route/v1" routeapply "github.com/openshift/client-go/route/applyconfigurations/route/v1" routev1client "github.com/openshift/client-go/route/clientset/versioned/typed/route/v1" + + "github.com/project-codeflare/codeflare-operator/pkg/config" ) // RayClusterReconciler reconciles a RayCluster object @@ -50,15 +52,17 @@ type RayClusterReconciler struct { routeClient *routev1client.RouteV1Client Scheme *runtime.Scheme CookieSalt string + Config *config.CodeFlareOperatorConfiguration } const ( - requeueTime = 10 - controllerName = "codeflare-raycluster-controller" - oAuthFinalizer = "ray.openshift.ai/oauth-finalizer" - oAuthServicePort = 443 - oAuthServicePortName = "oauth-proxy" - logRequeueing = "requeueing" + requeueTime = 10 + controllerName = "codeflare-raycluster-controller" + oAuthFinalizer = "ray.openshift.ai/oauth-finalizer" + oAuthServicePort = 443 + oAuthServicePortName = "oauth-proxy" + ingressServicePortName = "dashboard" + logRequeueing = "requeueing" ) var ( @@ -97,6 +101,10 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, client.IgnoreNotFound(err) } + isLocalInteractive := annotationBoolVal(ctx, &cluster, "sdk.codeflare.dev/local_interactive", false) + ingressDomain := "" // FIX - CFO will retrieve it. + isOpenShift, ingressHost := getClusterType(ctx, r.kubeClient, &cluster, ingressDomain) + if cluster.ObjectMeta.DeletionTimestamp.IsZero() { if !controllerutil.ContainsFinalizer(&cluster, oAuthFinalizer) { logger.Info("Add a finalizer", "finalizer", oAuthFinalizer) @@ -130,29 +138,63 @@ func (r *RayClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } - _, err := r.routeClient.Routes(cluster.Namespace).Apply(ctx, desiredClusterRoute(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) - if err != nil { - logger.Error(err, "Failed to update OAuth Route") - } + if cluster.Status.State != "suspended" && r.isRayDashboardOAuthEnabled() && isOpenShift { + logger.Info("Creating OAuth Objects") + _, err := r.routeClient.Routes(cluster.Namespace).Apply(ctx, desiredClusterRoute(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) + if err != nil { + logger.Error(err, "Failed to update OAuth Route") + return ctrl.Result{RequeueAfter: requeueTime}, err + } - _, err = r.kubeClient.CoreV1().Secrets(cluster.Namespace).Apply(ctx, desiredOAuthSecret(&cluster, r), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) - if err != nil { - logger.Error(err, "Failed to create OAuth Secret") - } + _, err = r.kubeClient.CoreV1().Secrets(cluster.Namespace).Apply(ctx, desiredOAuthSecret(&cluster, r), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) + if err != nil { + logger.Error(err, "Failed to create OAuth Secret") + return ctrl.Result{RequeueAfter: requeueTime}, err + } - _, err = r.kubeClient.CoreV1().Services(cluster.Namespace).Apply(ctx, desiredOAuthService(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) - if err != nil { - logger.Error(err, "Failed to update OAuth Service") - } + _, err = r.kubeClient.CoreV1().Services(cluster.Namespace).Apply(ctx, desiredOAuthService(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) + if err != nil { + logger.Error(err, "Failed to update OAuth Service") + return ctrl.Result{RequeueAfter: requeueTime}, err + } - _, err = r.kubeClient.CoreV1().ServiceAccounts(cluster.Namespace).Apply(ctx, desiredServiceAccount(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) - if err != nil { - logger.Error(err, "Failed to update OAuth ServiceAccount") - } + _, err = r.kubeClient.CoreV1().ServiceAccounts(cluster.Namespace).Apply(ctx, desiredServiceAccount(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) + if err != nil { + logger.Error(err, "Failed to update OAuth ServiceAccount") + return ctrl.Result{RequeueAfter: requeueTime}, err + } - _, err = r.kubeClient.RbacV1().ClusterRoleBindings().Apply(ctx, desiredOAuthClusterRoleBinding(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) - if err != nil { - logger.Error(err, "Failed to update OAuth ClusterRoleBinding") + _, err = r.kubeClient.RbacV1().ClusterRoleBindings().Apply(ctx, desiredOAuthClusterRoleBinding(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) + if err != nil { + logger.Error(err, "Failed to update OAuth ClusterRoleBinding") + return ctrl.Result{RequeueAfter: requeueTime}, err + } + + if isLocalInteractive { + logger.Info("Creating RayClient Route") + _, err := r.routeClient.Routes(cluster.Namespace).Apply(ctx, desiredRayClientRoute(&cluster), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) + if err != nil { + logger.Error(err, "Failed to update RayClient Route") + return ctrl.Result{RequeueAfter: requeueTime}, err + } + } + + } else if cluster.Status.State != "suspended" && !r.isRayDashboardOAuthEnabled() && !isOpenShift { + logger.Info("Creating Dashboard Ingress") + _, err := r.kubeClient.NetworkingV1().Ingresses(cluster.Namespace).Apply(ctx, desiredClusterIngress(&cluster, ingressHost), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) + if err != nil { + // This log is info level since errors are not fatal and are expected + logger.Info("WARN: Failed to update Dashboard Ingress", "error", err.Error(), logRequeueing, true) + return ctrl.Result{RequeueAfter: requeueTime}, err + } + if isLocalInteractive && ingressDomain != "" { + logger.Info("Creating RayClient Ingress") + _, err := r.kubeClient.NetworkingV1().Ingresses(cluster.Namespace).Apply(ctx, desiredRayClientIngress(&cluster, ingressDomain), metav1.ApplyOptions{FieldManager: controllerName, Force: true}) + if err != nil { + logger.Error(err, "Failed to update RayClient Ingress") + return ctrl.Result{RequeueAfter: requeueTime}, err + } + } } return ctrl.Result{}, nil @@ -193,19 +235,23 @@ func desiredServiceAccount(cluster *rayv1.RayCluster) *coreapply.ServiceAccountA WithAnnotations(map[string]string{ "serviceaccounts.openshift.io/oauth-redirectreference.first": "" + `{"kind":"OAuthRedirectReference","apiVersion":"v1",` + - `"reference":{"kind":"Route","name":"` + routeNameFromCluster(cluster) + `"}}`, + `"reference":{"kind":"Route","name":"` + dashboardNameFromCluster(cluster) + `"}}`, }). WithOwnerReferences( v1.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion), ) } -func routeNameFromCluster(cluster *rayv1.RayCluster) string { +func dashboardNameFromCluster(cluster *rayv1.RayCluster) string { return "ray-dashboard-" + cluster.Name } +func rayClientNameFromCluster(cluster *rayv1.RayCluster) string { + return "rayclient-" + cluster.Name +} + func desiredClusterRoute(cluster *rayv1.RayCluster) *routeapply.RouteApplyConfiguration { - return routeapply.Route(routeNameFromCluster(cluster), cluster.Namespace). + return routeapply.Route(dashboardNameFromCluster(cluster), cluster.Namespace). WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). WithSpec(routeapply.RouteSpec(). WithTo(routeapply.RouteTargetReference().WithKind("Service").WithName(oauthServiceNameFromCluster(cluster))). diff --git a/pkg/controllers/support.go b/pkg/controllers/support.go new file mode 100644 index 000000000..348cd03de --- /dev/null +++ b/pkg/controllers/support.go @@ -0,0 +1,181 @@ +package controllers + +import ( + "context" + "fmt" + "strconv" + "strings" + + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + + networkingv1 "k8s.io/api/networking/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + v1 "k8s.io/client-go/applyconfigurations/meta/v1" + networkingv1ac "k8s.io/client-go/applyconfigurations/networking/v1" + "k8s.io/client-go/discovery" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + + routeapply "github.com/openshift/client-go/route/applyconfigurations/route/v1" +) + +func serviceNameFromCluster(cluster *rayv1.RayCluster) string { + return cluster.Name + "-head-svc" +} + +func desiredRayClientRoute(cluster *rayv1.RayCluster) *routeapply.RouteApplyConfiguration { + return routeapply.Route(rayClientNameFromCluster(cluster), cluster.Namespace). + WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithSpec(routeapply.RouteSpec(). + WithHost(rayClientNameFromCluster(cluster) + "-" + cluster.Namespace). + WithTo(routeapply.RouteTargetReference().WithKind("Service").WithName(serviceNameFromCluster(cluster)).WithWeight(100)). + WithPort(routeapply.RoutePort().WithTargetPort(intstr.FromString("client"))). + WithTLS(routeapply.TLSConfig().WithTermination("passthrough")), + ). + WithOwnerReferences( + v1.OwnerReference().WithUID(cluster.UID).WithName(cluster.Name).WithKind(cluster.Kind).WithAPIVersion(cluster.APIVersion), + ) +} + +// Create an Ingress object for the RayCluster +func desiredRayClientIngress(cluster *rayv1.RayCluster, ingressDomain string) *networkingv1ac.IngressApplyConfiguration { + return networkingv1ac.Ingress(rayClientNameFromCluster(cluster), cluster.Namespace). + WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithAnnotations(map[string]string{ + "nginx.ingress.kubernetes.io/rewrite-target": "/", + "nginx.ingress.kubernetes.io/ssl-redirect": "true", + "nginx.ingress.kubernetes.io/ssl-passthrough": "true", + }). + WithOwnerReferences(v1.OwnerReference(). + WithAPIVersion(cluster.APIVersion). + WithKind(cluster.Kind). + WithName(cluster.Name). + WithUID(types.UID(cluster.UID))). + WithSpec(networkingv1ac.IngressSpec(). + WithIngressClassName("nginx"). + WithRules(networkingv1ac.IngressRule(). + WithHost(rayClientNameFromCluster(cluster) + "-" + cluster.Namespace + "." + ingressDomain). + WithHTTP(networkingv1ac.HTTPIngressRuleValue(). + WithPaths(networkingv1ac.HTTPIngressPath(). + WithPath("/"). + WithPathType(networkingv1.PathTypeImplementationSpecific). + WithBackend(networkingv1ac.IngressBackend(). + WithService(networkingv1ac.IngressServiceBackend(). + WithName(serviceNameFromCluster(cluster)). + WithPort(networkingv1ac.ServiceBackendPort(). + WithNumber(10001), + ), + ), + ), + ), + ), + ), + ) +} + +// Create an Ingress object for the RayCluster +func desiredClusterIngress(cluster *rayv1.RayCluster, ingressHost string) *networkingv1ac.IngressApplyConfiguration { + return networkingv1ac.Ingress(dashboardNameFromCluster(cluster), cluster.Namespace). + WithLabels(map[string]string{"ray.io/cluster-name": cluster.Name}). + WithOwnerReferences(v1.OwnerReference(). + WithAPIVersion(cluster.APIVersion). + WithKind(cluster.Kind). + WithName(cluster.Name). + WithUID(types.UID(cluster.UID))). + WithSpec(networkingv1ac.IngressSpec(). + WithRules(networkingv1ac.IngressRule(). + WithHost(ingressHost). // KinD hostname or ingressDomain + WithHTTP(networkingv1ac.HTTPIngressRuleValue(). + WithPaths(networkingv1ac.HTTPIngressPath(). + WithPath("/"). + WithPathType(networkingv1.PathTypePrefix). + WithBackend(networkingv1ac.IngressBackend(). + WithService(networkingv1ac.IngressServiceBackend(). + WithName(serviceNameFromCluster(cluster)). + WithPort(networkingv1ac.ServiceBackendPort(). + WithName(ingressServicePortName), + ), + ), + ), + ), + ), + ), + ) +} + +// isOnKindCluster checks if the current cluster is a KinD cluster. +// It searches for a node with a label commonly used by KinD clusters. +func isOnKindCluster(clientset *kubernetes.Clientset) (bool, error) { + nodes, err := clientset.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ + LabelSelector: "kubernetes.io/hostname=kind-control-plane", + }) + if err != nil { + return false, err + } + // If we find one or more nodes with the label, assume it's a KinD cluster. + return len(nodes.Items) > 0, nil +} + +// getDiscoveryClient returns a discovery client for the current reconciler +func getDiscoveryClient(config *rest.Config) (*discovery.DiscoveryClient, error) { + return discovery.NewDiscoveryClientForConfig(config) +} + +// Check where we are running. We are trying to distinguish here whether +// this is vanilla kubernetes cluster or Openshift +func getClusterType(ctx context.Context, clientset *kubernetes.Clientset, cluster *rayv1.RayCluster, ingressDomain string) (bool, string) { + // The discovery package is used to discover APIs supported by a Kubernetes API server. + logger := ctrl.LoggerFrom(ctx) + config, err := ctrl.GetConfig() + if err != nil && config == nil { + logger.Info("Cannot retrieve config, assuming we're on Vanilla Kubernetes") + return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingressDomain) + } + dclient, err := getDiscoveryClient(config) + if err != nil && dclient == nil { + logger.Info("Cannot retrieve a DiscoveryClient, assuming we're on Vanilla Kubernetes") + return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingressDomain) + } + apiGroupList, err := dclient.ServerGroups() + if err != nil { + logger.Info("Error while querying ServerGroups, assuming we're on Vanilla Kubernetes") + return false, "" + } + for i := 0; i < len(apiGroupList.Groups); i++ { + if strings.HasSuffix(apiGroupList.Groups[i].Name, ".openshift.io") { + logger.Info("We detected being on OpenShift!") + return true, "" + } + } + onKind, _ := isOnKindCluster(clientset) + if onKind && ingressDomain == "" { + logger.Info("We detected being on a KinD cluster!") + return false, "kind" + } + logger.Info("We detected being on Vanilla Kubernetes!") + return false, fmt.Sprintf("ray-dashboard-%s-%s.%s", cluster.Name, cluster.Namespace, ingressDomain) +} + +func (r *RayClusterReconciler) isRayDashboardOAuthEnabled() bool { + if r.Config != nil && r.Config.KubeRay != nil && r.Config.KubeRay.RayDashboardOAuthEnabled != nil { + return *r.Config.KubeRay.RayDashboardOAuthEnabled + } + return true +} + +func annotationBoolVal(ctx context.Context, cluster *rayv1.RayCluster, annotation string, defaultValue bool) bool { + logger := ctrl.LoggerFrom(ctx) + val, exists := cluster.ObjectMeta.Annotations[annotation] + if !exists || val == "" { + return defaultValue + } + boolVal, err := strconv.ParseBool(val) + if err != nil { + logger.Error(err, "Could not convert annotation value to bool", "annotation", annotation, "value", val) + return defaultValue + } + return boolVal +}