Skip to content

Commit 2c010a9

Browse files
committed
component-level failure detection for RayCluster and RayJob
1 parent f696d26 commit 2c010a9

File tree

1 file changed

+59
-7
lines changed

1 file changed

+59
-7
lines changed

internal/controller/appwrapper/appwrapper_controller.go

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -511,12 +511,32 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
511511
return summary, nil
512512
}
513513

514+
//gocyclo:ignore
514515
func (r *AppWrapperReconciler) getComponentStatus(ctx context.Context, aw *workloadv1beta2.AppWrapper) (*componentStatusSummary, error) {
515516
summary := &componentStatusSummary{expected: int32(len(aw.Status.ComponentStatus))}
516517

517518
for componentIdx := range aw.Status.ComponentStatus {
518519
cs := &aw.Status.ComponentStatus[componentIdx]
519520
switch cs.APIVersion + ":" + cs.Kind {
521+
522+
case "batch/v1:Job":
523+
obj := &batchv1.Job{}
524+
if err := r.Get(ctx, types.NamespacedName{Name: cs.Name, Namespace: aw.Namespace}, obj); err == nil {
525+
if obj.GetDeletionTimestamp().IsZero() {
526+
summary.deployed += 1
527+
528+
// batch/v1 Jobs are failed when status.Conditions contains an entry with type "Failed" and status "True"
529+
for _, jc := range obj.Status.Conditions {
530+
if jc.Type == batchv1.JobFailed && jc.Status == v1.ConditionTrue {
531+
summary.failed += 1
532+
}
533+
}
534+
}
535+
536+
} else if !apierrors.IsNotFound(err) {
537+
return nil, err
538+
}
539+
520540
case "kubeflow.org/v1:PyTorchJob":
521541
obj := &unstructured.Unstructured{}
522542
obj.SetAPIVersion(cs.APIVersion)
@@ -552,20 +572,52 @@ func (r *AppWrapperReconciler) getComponentStatus(ctx context.Context, aw *workl
552572
return nil, err
553573
}
554574

555-
case "batch/v1:Job":
556-
obj := &batchv1.Job{}
575+
case "ray.io/v1:RayCluster":
576+
obj := &unstructured.Unstructured{}
577+
obj.SetAPIVersion(cs.APIVersion)
578+
obj.SetKind(cs.Kind)
557579
if err := r.Get(ctx, types.NamespacedName{Name: cs.Name, Namespace: aw.Namespace}, obj); err == nil {
558580
if obj.GetDeletionTimestamp().IsZero() {
559581
summary.deployed += 1
560582

561-
// batch/v1 Jobs are failed when status.Conditions contains an entry with type "Failed" and status "True"
562-
for _, jc := range obj.Status.Conditions {
563-
if jc.Type == batchv1.JobFailed && jc.Status == v1.ConditionTrue {
564-
summary.failed += 1
565-
}
583+
// RayCluster is failed if status.State is "failed"
584+
status, ok := obj.UnstructuredContent()["status"]
585+
if !ok {
586+
continue
587+
}
588+
state, ok := status.(map[string]interface{})["state"]
589+
if !ok {
590+
continue
591+
}
592+
if state.(string) == "failed" {
593+
summary.failed += 1
566594
}
567595
}
596+
} else if !apierrors.IsNotFound(err) {
597+
return nil, err
598+
}
599+
600+
case "ray.io/v1:RayJob":
601+
obj := &unstructured.Unstructured{}
602+
obj.SetAPIVersion(cs.APIVersion)
603+
obj.SetKind(cs.Kind)
604+
if err := r.Get(ctx, types.NamespacedName{Name: cs.Name, Namespace: aw.Namespace}, obj); err == nil {
605+
if obj.GetDeletionTimestamp().IsZero() {
606+
summary.deployed += 1
568607

608+
// RayJob is failed if status.jobsStatus is "FAILED"
609+
status, ok := obj.UnstructuredContent()["status"]
610+
if !ok {
611+
continue
612+
}
613+
jobStatus, ok := status.(map[string]interface{})["jobStatus"]
614+
if !ok {
615+
continue
616+
}
617+
if jobStatus.(string) == "FAILED" {
618+
summary.failed += 1
619+
}
620+
}
569621
} else if !apierrors.IsNotFound(err) {
570622
return nil, err
571623
}

0 commit comments

Comments
 (0)