From 2094f8699e83d22a3863832329cb4f8ff17615f7 Mon Sep 17 00:00:00 2001 From: David Grove Date: Mon, 24 Jun 2024 11:54:03 -0400 Subject: [PATCH] component-level failure detection for RayCluster and RayJob --- .../appwrapper/appwrapper_controller.go | 66 +++++++++++++++++-- 1 file changed, 59 insertions(+), 7 deletions(-) diff --git a/internal/controller/appwrapper/appwrapper_controller.go b/internal/controller/appwrapper/appwrapper_controller.go index 7484749..9db3d5c 100644 --- a/internal/controller/appwrapper/appwrapper_controller.go +++ b/internal/controller/appwrapper/appwrapper_controller.go @@ -514,12 +514,32 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b return summary, nil } +//gocyclo:ignore func (r *AppWrapperReconciler) getComponentStatus(ctx context.Context, aw *workloadv1beta2.AppWrapper) (*componentStatusSummary, error) { summary := &componentStatusSummary{expected: int32(len(aw.Status.ComponentStatus))} for componentIdx := range aw.Status.ComponentStatus { cs := &aw.Status.ComponentStatus[componentIdx] switch cs.APIVersion + ":" + cs.Kind { + + case "batch/v1:Job": + obj := &batchv1.Job{} + if err := r.Get(ctx, types.NamespacedName{Name: cs.Name, Namespace: aw.Namespace}, obj); err == nil { + if obj.GetDeletionTimestamp().IsZero() { + summary.deployed += 1 + + // batch/v1 Jobs are failed when status.Conditions contains an entry with type "Failed" and status "True" + for _, jc := range obj.Status.Conditions { + if jc.Type == batchv1.JobFailed && jc.Status == v1.ConditionTrue { + summary.failed += 1 + } + } + } + + } else if !apierrors.IsNotFound(err) { + return nil, err + } + case "kubeflow.org/v1:PyTorchJob": obj := &unstructured.Unstructured{} obj.SetAPIVersion(cs.APIVersion) @@ -555,20 +575,52 @@ func (r *AppWrapperReconciler) getComponentStatus(ctx context.Context, aw *workl return nil, err } - case "batch/v1:Job": - obj := &batchv1.Job{} + case "ray.io/v1:RayCluster": + obj := &unstructured.Unstructured{} + obj.SetAPIVersion(cs.APIVersion) + obj.SetKind(cs.Kind) if err := r.Get(ctx, types.NamespacedName{Name: cs.Name, Namespace: aw.Namespace}, obj); err == nil { if obj.GetDeletionTimestamp().IsZero() { summary.deployed += 1 - // batch/v1 Jobs are failed when status.Conditions contains an entry with type "Failed" and status "True" - for _, jc := range obj.Status.Conditions { - if jc.Type == batchv1.JobFailed && jc.Status == v1.ConditionTrue { - summary.failed += 1 - } + // RayCluster is failed if status.State is "failed" + status, ok := obj.UnstructuredContent()["status"] + if !ok { + continue + } + state, ok := status.(map[string]interface{})["state"] + if !ok { + continue + } + if state.(string) == "failed" { + summary.failed += 1 } } + } else if !apierrors.IsNotFound(err) { + return nil, err + } + + case "ray.io/v1:RayJob": + obj := &unstructured.Unstructured{} + obj.SetAPIVersion(cs.APIVersion) + obj.SetKind(cs.Kind) + if err := r.Get(ctx, types.NamespacedName{Name: cs.Name, Namespace: aw.Namespace}, obj); err == nil { + if obj.GetDeletionTimestamp().IsZero() { + summary.deployed += 1 + // RayJob is failed if status.jobsStatus is "FAILED" + status, ok := obj.UnstructuredContent()["status"] + if !ok { + continue + } + jobStatus, ok := status.(map[string]interface{})["jobStatus"] + if !ok { + continue + } + if jobStatus.(string) == "FAILED" { + summary.failed += 1 + } + } } else if !apierrors.IsNotFound(err) { return nil, err }