diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 899e49157..bb419783c 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -32,6 +32,14 @@ rules: - list - update - watch +- apiGroups: + - apiextensions.k8s.io + resources: + - customresourcedefinitions + verbs: + - get + - list + - watch - apiGroups: - authentication.k8s.io resources: diff --git a/go.mod b/go.mod index f8a0da50a..047755ad9 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,9 @@ require ( github.com/project-codeflare/codeflare-common v0.0.0-20240207083912-d7a229270a0a github.com/ray-project/kuberay/ray-operator v1.1.0 go.uber.org/zap v1.26.0 + golang.org/x/exp v0.0.0-20230905200255-921286631fa9 k8s.io/api v0.28.4 + k8s.io/apiextensions-apiserver v0.28.4 k8s.io/apimachinery v0.28.4 k8s.io/client-go v11.0.0+incompatible k8s.io/component-base v0.28.4 @@ -78,7 +80,6 @@ require ( github.com/spf13/pflag v1.0.5 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/oauth2 v0.16.0 // indirect golang.org/x/sys v0.18.0 // indirect @@ -92,7 +93,6 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/apiextensions-apiserver v0.28.4 // indirect k8s.io/kube-openapi v0.0.0-20230901164831-6c774f458599 // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.3.0 // indirect diff --git a/main.go b/main.go index 31afbcbbe..aa6c4dff7 100644 --- a/main.go +++ b/main.go @@ -30,19 +30,24 @@ import ( dsciv1 "github.com/opendatahub-io/opendatahub-operator/v2/apis/dscinitialization/v1" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "go.uber.org/zap/zapcore" + "golang.org/x/exp/slices" corev1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/discovery" "k8s.io/client-go/kubernetes" clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + retrywatch "k8s.io/client-go/tools/watch" configv1alpha1 "k8s.io/component-base/config/v1alpha1" "k8s.io/klog/v2" "k8s.io/utils/ptr" @@ -169,33 +174,81 @@ func main() { exitOnError(err, cfg.KubeRay.IngressDomain) } - go setupControllers(mgr, kubeClient, cfg, isOpenShift(ctx, kubeClient.DiscoveryClient), certsReady) - setupLog.Info("setting up health endpoints") exitOnError(setupProbeEndpoints(mgr, cfg, certsReady), "unable to set up health check") + setupLog.Info("setting up RayCluster controller") + go waitForRayClusterAPIandSetupController(ctx, mgr, cfg, isOpenShift(ctx, kubeClient.DiscoveryClient), certsReady) + setupLog.Info("starting manager") exitOnError(mgr.Start(ctx), "error running manager") } -func setupControllers(mgr ctrl.Manager, dc discovery.DiscoveryInterface, cfg *config.CodeFlareOperatorConfiguration, isOpenShift bool, certsReady chan struct{}) { +func setupRayClusterController(mgr ctrl.Manager, cfg *config.CodeFlareOperatorConfiguration, isOpenShift bool, certsReady chan struct{}) error { setupLog.Info("Waiting for certificate generation to complete") <-certsReady setupLog.Info("Certs ready") - exitOnError(controllers.SetupRayClusterWebhookWithManager(mgr, cfg.KubeRay), "error setting up RayCluster webhook") + err := controllers.SetupRayClusterWebhookWithManager(mgr, cfg.KubeRay) + if err != nil { + return err + } + + rayClusterController := controllers.RayClusterReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Config: cfg.KubeRay, + IsOpenShift: isOpenShift, + } + return rayClusterController.SetupWithManager(mgr) +} + +// +kubebuilder:rbac:groups="apiextensions.k8s.io",resources=customresourcedefinitions,verbs=get;list;watch + +func waitForRayClusterAPIandSetupController(ctx context.Context, mgr ctrl.Manager, cfg *config.CodeFlareOperatorConfiguration, isOpenShift bool, certsReady chan struct{}) { + crdClient, err := apiextensionsclientset.NewForConfig(mgr.GetConfig()) + exitOnError(err, "unable to create CRD client") + + crdList, err := crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{}) + exitOnError(err, "unable to list CRDs") + + if slices.ContainsFunc(crdList.Items, func(crd apiextensionsv1.CustomResourceDefinition) bool { + return crd.Name == "rayclusters.ray.io" + }) { + exitOnError(setupRayClusterController(mgr, cfg, isOpenShift, certsReady), "unable to setup RayCluster controller") + } + + retryWatcher, err := retrywatch.NewRetryWatcher(crdList.ResourceVersion, &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{}) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return crdClient.ApiextensionsV1().CustomResourceDefinitions().Watch(ctx, metav1.ListOptions{}) + }, + }) + exitOnError(err, "unable to create retry watcher") - ok, err := hasAPIResourceForGVK(dc, rayv1.GroupVersion.WithKind("RayCluster")) - if ok { - rayClusterController := controllers.RayClusterReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Config: cfg.KubeRay, - IsOpenShift: isOpenShift, + defer retryWatcher.Stop() + for { + select { + case <-ctx.Done(): + return + case event := <-retryWatcher.ResultChan(): + switch event.Type { + case watch.Error: + exitOnError(apierrors.FromObject(event.Object), "error watching for RayCluster API") + + case watch.Added, watch.Modified: + if crd := event.Object.(*apiextensionsv1.CustomResourceDefinition); crd.Name == "rayclusters.ray.io" && + slices.ContainsFunc(crd.Status.Conditions, func(condition apiextensionsv1.CustomResourceDefinitionCondition) bool { + return condition.Type == apiextensionsv1.Established && condition.Status == apiextensionsv1.ConditionTrue + }) { + setupLog.Info("RayCluster API installed, setting up controller") + exitOnError(setupRayClusterController(mgr, cfg, isOpenShift, certsReady), "unable to setup RayCluster controller") + return + } + } } - exitOnError(rayClusterController.SetupWithManager(mgr), "Error setting up RayCluster controller") - } else if err != nil { - exitOnError(err, "Could not determine if RayCluster CR present on cluster.") } } @@ -289,23 +342,6 @@ func createConfigMap(ctx context.Context, client kubernetes.Interface, ns, name return err } -func hasAPIResourceForGVK(dc discovery.DiscoveryInterface, gvk schema.GroupVersionKind) (bool, error) { - gv, kind := gvk.ToAPIVersionAndKind() - if resources, err := dc.ServerResourcesForGroupVersion(gv); err != nil { - if apierrors.IsNotFound(err) { - return false, nil - } - return false, err - } else { - for _, res := range resources.APIResources { - if res.Kind == kind { - return true, nil - } - } - } - return false, nil -} - func namespaceOrDie() string { // This way assumes you've set the NAMESPACE environment variable either manually, when running // the operator standalone, or using the downward API, when running the operator in-cluster.