Skip to content

Commit 435a10e

Browse files
committed
graceful handling of Kueue not being installed before the codeflare operator
1 parent a2cf688 commit 435a10e

File tree

2 files changed

+113
-62
lines changed

2 files changed

+113
-62
lines changed

.github/workflows/olm_tests.yaml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,6 @@ jobs:
5454
- name: Setup and start KinD cluster
5555
uses: ./common/github-actions/kind
5656

57-
- name: Deploy Kueue
58-
run: |
59-
make kueue-e2e
60-
6157
- name: Install OLM
6258
run: |
6359
kubectl create -f https://github.com/operator-framework/operator-lifecycle-manager/releases/download/${OLM_VERSION}/crds.yaml

main.go

Lines changed: 113 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ var (
7777
AppWrapperVersion = "UNKNOWN"
7878
)
7979

80+
const (
81+
workloadAPI = "workloads.kueue.x-k8s.io"
82+
rayclusterAPI = "rayclusters.ray.io"
83+
)
84+
8085
func init() {
8186
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
8287
// Ray
@@ -115,7 +120,7 @@ func main() {
115120
"date", BuildDate,
116121
)
117122

118-
ctx := ctrl.SetupSignalHandler()
123+
ctx, cancel := context.WithCancel(ctrl.SetupSignalHandler())
119124

120125
cfg := &config.CodeFlareOperatorConfiguration{
121126
ClientConnection: &config.ClientConnection{
@@ -188,7 +193,7 @@ func main() {
188193
}
189194

190195
setupLog.Info("setting up indexers")
191-
exitOnError(setupIndexers(ctx, mgr, cfg), "unable to setup indexers")
196+
exitOnError(setupWorkloadIndexer(ctx, cancel, mgr, cfg), "unable to setup indexers")
192197

193198
setupLog.Info("setting up health endpoints")
194199
exitOnError(setupProbeEndpoints(mgr, cfg, certsReady), "unable to set up health check")
@@ -197,7 +202,7 @@ func main() {
197202
go waitForRayClusterAPIandSetupController(ctx, mgr, cfg, isOpenShift(ctx, kubeClient.DiscoveryClient), certsReady)
198203

199204
setupLog.Info("setting up AppWrapper controller")
200-
go setupAppWrapperController(mgr, cfg, certsReady)
205+
go waitForWorkloadAPIAndSetupAppWrapperController(ctx, mgr, cfg, certsReady)
201206

202207
setupLog.Info("starting manager")
203208
exitOnError(mgr.Start(ctx), "error running manager")
@@ -222,75 +227,60 @@ func setupRayClusterController(mgr ctrl.Manager, cfg *config.CodeFlareOperatorCo
222227
return rayClusterController.SetupWithManager(mgr)
223228
}
224229

225-
// +kubebuilder:rbac:groups="apiextensions.k8s.io",resources=customresourcedefinitions,verbs=get;list;watch
226-
227230
func waitForRayClusterAPIandSetupController(ctx context.Context, mgr ctrl.Manager, cfg *config.CodeFlareOperatorConfiguration, isOpenShift bool, certsReady chan struct{}) {
228-
crdClient, err := apiextensionsclientset.NewForConfig(mgr.GetConfig())
229-
exitOnError(err, "unable to create CRD client")
230-
231-
crdList, err := crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{})
232-
exitOnError(err, "unable to list CRDs")
233-
234-
if slices.ContainsFunc(crdList.Items, func(crd apiextensionsv1.CustomResourceDefinition) bool {
235-
return crd.Name == "rayclusters.ray.io"
236-
}) {
231+
if isAPIAvailable(ctx, mgr, rayclusterAPI) {
237232
exitOnError(setupRayClusterController(mgr, cfg, isOpenShift, certsReady), "unable to setup RayCluster controller")
238-
}
239-
240-
retryWatcher, err := retrywatch.NewRetryWatcher(crdList.ResourceVersion, &cache.ListWatch{
241-
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
242-
return crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{})
243-
},
244-
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
245-
return crdClient.ApiextensionsV1().CustomResourceDefinitions().Watch(ctx, metav1.ListOptions{})
246-
},
247-
})
248-
exitOnError(err, "unable to create retry watcher")
249-
250-
defer retryWatcher.Stop()
251-
for {
252-
select {
253-
case <-ctx.Done():
254-
return
255-
case event := <-retryWatcher.ResultChan():
256-
switch event.Type {
257-
case watch.Error:
258-
exitOnError(apierrors.FromObject(event.Object), "error watching for RayCluster API")
259-
260-
case watch.Added, watch.Modified:
261-
if crd := event.Object.(*apiextensionsv1.CustomResourceDefinition); crd.Name == "rayclusters.ray.io" &&
262-
slices.ContainsFunc(crd.Status.Conditions, func(condition apiextensionsv1.CustomResourceDefinitionCondition) bool {
263-
return condition.Type == apiextensionsv1.Established && condition.Status == apiextensionsv1.ConditionTrue
264-
}) {
265-
setupLog.Info("RayCluster API installed, setting up controller")
266-
exitOnError(setupRayClusterController(mgr, cfg, isOpenShift, certsReady), "unable to setup RayCluster controller")
267-
return
268-
}
269-
}
270-
}
233+
} else {
234+
waitForAPI(ctx, mgr, rayclusterAPI, func() {
235+
exitOnError(setupRayClusterController(mgr, cfg, isOpenShift, certsReady), "unable to setup RayCluster controller")
236+
})
271237
}
272238
}
273239

274-
func setupAppWrapperController(mgr ctrl.Manager, cfg *config.CodeFlareOperatorConfiguration, certsReady chan struct{}) {
240+
func setupAppWrapperController(mgr ctrl.Manager, cfg *config.CodeFlareOperatorConfiguration, certsReady chan struct{}) error {
275241
setupLog.Info("Waiting for certificate generation to complete")
276242
<-certsReady
277243
setupLog.Info("Certs ready")
278244

279-
if cfg.AppWrapper != nil && ptr.Deref(cfg.AppWrapper.Enabled, false) {
280-
exitOnError(awctrl.SetupWebhooks(mgr, cfg.AppWrapper.Config), "error setting up AppWrapper webhook")
281-
exitOnError(awctrl.SetupControllers(mgr, cfg.AppWrapper.Config), "error setting up AppWrapper controller")
245+
setupLog.Info("Setting up AppWrapper webhook and controller")
246+
if err := awctrl.SetupWebhooks(mgr, cfg.AppWrapper.Config); err != nil {
247+
return err
248+
}
249+
return awctrl.SetupControllers(mgr, cfg.AppWrapper.Config)
250+
}
251+
252+
func waitForWorkloadAPIAndSetupAppWrapperController(ctx context.Context, mgr ctrl.Manager, cfg *config.CodeFlareOperatorConfiguration, certsReady chan struct{}) {
253+
if cfg.AppWrapper == nil || !ptr.Deref(cfg.AppWrapper.Enabled, false) {
254+
setupLog.Info("AppWrapper controller disabled by config")
255+
}
256+
257+
if isAPIAvailable(ctx, mgr, workloadAPI) {
258+
exitOnError(setupAppWrapperController(mgr, cfg, certsReady), "unable to setup AppWrapper controller")
282259
} else {
283-
setupLog.Info("AppWrapper controller disabled", "Config flag value", *cfg.AppWrapper.Enabled)
260+
waitForAPI(ctx, mgr, workloadAPI, func() {
261+
exitOnError(setupAppWrapperController(mgr, cfg, certsReady), "unable to setup AppWrapper controller")
262+
})
284263
}
285264
}
286265

287-
func setupIndexers(ctx context.Context, mgr ctrl.Manager, cfg *config.CodeFlareOperatorConfiguration) error {
288-
if cfg.AppWrapper != nil && ptr.Deref(cfg.AppWrapper.Enabled, false) {
289-
if err := awctrl.SetupIndexers(ctx, mgr, cfg.AppWrapper.Config); err != nil {
290-
return fmt.Errorf("workload indexer: %w", err)
291-
}
266+
func setupWorkloadIndexer(ctx context.Context, cancel context.CancelFunc, mgr ctrl.Manager, cfg *config.CodeFlareOperatorConfiguration) error {
267+
if cfg.AppWrapper == nil || !ptr.Deref(cfg.AppWrapper.Enabled, false) {
268+
setupLog.Info("Workload indexer disabled by config")
269+
return nil
270+
}
271+
272+
if isAPIAvailable(ctx, mgr, workloadAPI) {
273+
return awctrl.SetupIndexers(ctx, mgr, cfg.AppWrapper.Config)
274+
} else {
275+
// If AppWrappers are enabled and the Workload API becomes available later, initiate an orderly
276+
// restart of the codeflare operator to enable the workload indexer to be setup in the the new instance of the operator.
277+
// It is not possible to add an indexer once the mgr has started so, a restart if the only avenue.
278+
go waitForAPI(ctx, mgr, workloadAPI, func() {
279+
setupLog.Info("Workload API now available; triggering controller restart")
280+
cancel()
281+
})
282+
return nil
292283
}
293-
return nil
294284
}
295285

296286
// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;update
@@ -437,3 +427,68 @@ func getClusterDomain(ctx context.Context, configClient *clientset.Clientset) (s
437427

438428
return domain, nil
439429
}
430+
431+
// +kubebuilder:rbac:groups="apiextensions.k8s.io",resources=customresourcedefinitions,verbs=get;list;watch
432+
433+
func isAPIAvailable(ctx context.Context, mgr ctrl.Manager, apiName string) bool {
434+
crdClient, err := apiextensionsclientset.NewForConfig(mgr.GetConfig())
435+
exitOnError(err, "unable to create CRD client")
436+
437+
crdList, err := crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{})
438+
exitOnError(err, "unable to list CRDs")
439+
440+
return slices.ContainsFunc(crdList.Items, func(crd apiextensionsv1.CustomResourceDefinition) bool {
441+
return crd.Name == apiName
442+
})
443+
}
444+
445+
func waitForAPI(ctx context.Context, mgr ctrl.Manager, apiName string, action func()) {
446+
crdClient, err := apiextensionsclientset.NewForConfig(mgr.GetConfig())
447+
exitOnError(err, "unable to create CRD client")
448+
449+
crdList, err := crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{})
450+
exitOnError(err, "unable to list CRDs")
451+
452+
// If API is already available, just invoke action
453+
if slices.ContainsFunc(crdList.Items, func(crd apiextensionsv1.CustomResourceDefinition) bool {
454+
return crd.Name == apiName
455+
}) {
456+
action()
457+
return
458+
}
459+
460+
// Wait for the API to become available then invoke action
461+
setupLog.Info(fmt.Sprintf("API %v not available, setting up retry watcher", apiName))
462+
retryWatcher, err := retrywatch.NewRetryWatcher(crdList.ResourceVersion, &cache.ListWatch{
463+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
464+
return crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{})
465+
},
466+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
467+
return crdClient.ApiextensionsV1().CustomResourceDefinitions().Watch(ctx, metav1.ListOptions{})
468+
},
469+
})
470+
exitOnError(err, "unable to create retry watcher")
471+
472+
defer retryWatcher.Stop()
473+
for {
474+
select {
475+
case <-ctx.Done():
476+
return
477+
case event := <-retryWatcher.ResultChan():
478+
switch event.Type {
479+
case watch.Error:
480+
exitOnError(apierrors.FromObject(event.Object), fmt.Sprintf("error watching for API %v", apiName))
481+
482+
case watch.Added, watch.Modified:
483+
if crd := event.Object.(*apiextensionsv1.CustomResourceDefinition); crd.Name == apiName &&
484+
slices.ContainsFunc(crd.Status.Conditions, func(condition apiextensionsv1.CustomResourceDefinitionCondition) bool {
485+
return condition.Type == apiextensionsv1.Established && condition.Status == apiextensionsv1.ConditionTrue
486+
}) {
487+
setupLog.Info(fmt.Sprintf("API %v installed, invoking deferred action", apiName))
488+
action()
489+
return
490+
}
491+
}
492+
}
493+
}
494+
}

0 commit comments

Comments
 (0)