From 807e9259a3d8fcf885419a69cfe6c9e82f4f8be2 Mon Sep 17 00:00:00 2001 From: David Grove Date: Thu, 20 Jun 2024 09:59:50 -0400 Subject: [PATCH] Recognize failed PyTorchJobs and skip pod-level gracePeriod --- .../appwrapper/appwrapper_controller.go | 84 ++++++++++++++++--- samples/wrapped-failing-pytorch-job.yaml | 43 ++++++++++ 2 files changed, 115 insertions(+), 12 deletions(-) create mode 100644 samples/wrapped-failing-pytorch-job.yaml diff --git a/internal/controller/appwrapper/appwrapper_controller.go b/internal/controller/appwrapper/appwrapper_controller.go index bd9b6e1..deae07a 100644 --- a/internal/controller/appwrapper/appwrapper_controller.go +++ b/internal/controller/appwrapper/appwrapper_controller.go @@ -26,6 +26,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -66,6 +67,7 @@ type podStatusSummary struct { type componentStatusSummary struct { expected int32 deployed int32 + failed int32 } // permission to fully control appwrappers @@ -240,6 +242,18 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperFailed) } + // If a component's controller has put it into a failed state, we do not need + // to allow any further grace period. The situation will not self-correct. + if compStatus.failed > 0 { + meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{ + Type: string(workloadv1beta2.Unhealthy), + Status: metav1.ConditionTrue, + Reason: "FailedComponent", + Message: fmt.Sprintf("Found %v failed components", compStatus.failed), + }) + return r.resetOrFail(ctx, aw) + } + // Second, check the Pod-level status of the workload podStatus, err := r.getPodStatus(ctx, aw) if err != nil { @@ -501,20 +515,66 @@ func (r *AppWrapperReconciler) getComponentStatus(ctx context.Context, aw *workl for componentIdx := range aw.Status.ComponentStatus { cs := &aw.Status.ComponentStatus[componentIdx] - obj := &metav1.PartialObjectMetadata{TypeMeta: metav1.TypeMeta{Kind: cs.Kind, APIVersion: cs.APIVersion}} - if err := r.Get(ctx, types.NamespacedName{Name: cs.Name, Namespace: aw.Namespace}, obj); err == nil { - if obj.DeletionTimestamp.IsZero() { - summary.deployed += 1 + switch cs.Kind { + case "PyTorchJob": + obj := &unstructured.Unstructured{} + obj.SetAPIVersion(cs.APIVersion) + obj.SetKind(cs.Kind) + if err := r.Get(ctx, types.NamespacedName{Name: cs.Name, Namespace: aw.Namespace}, obj); err == nil { + if obj.GetDeletionTimestamp().IsZero() { + summary.deployed += 1 + + // PyTorchJob is failed if status.Conditions contains an entry with type "Failed" and status "True" + status, ok := obj.UnstructuredContent()["status"] + if !ok { + continue + } + cond, ok := status.(map[string]interface{})["conditions"] + if !ok { + continue + } + condArray, ok := cond.([]interface{}) + if !ok { + continue + } + for _, aCond := range condArray { + if condMap, ok := aCond.(map[string]interface{}); ok { + if condType, ok := condMap["type"]; ok && condType.(string) == "Failed" { + if status, ok := condMap["status"]; ok && status.(string) == "True" { + summary.failed += 1 + } + } + } + } + } + } else { + if apierrors.IsNotFound(err) { + meta.SetStatusCondition(&aw.Status.ComponentStatus[componentIdx].Conditions, metav1.Condition{ + Type: string(workloadv1beta2.Unhealthy), + Status: metav1.ConditionTrue, + Reason: "ComponentNotFound", + }) + } else { + return nil, err + } } - } else { - if apierrors.IsNotFound(err) { - meta.SetStatusCondition(&aw.Status.ComponentStatus[componentIdx].Conditions, metav1.Condition{ - Type: string(workloadv1beta2.Unhealthy), - Status: metav1.ConditionTrue, - Reason: "ComponentNotFound", - }) + + default: + obj := &metav1.PartialObjectMetadata{TypeMeta: metav1.TypeMeta{Kind: cs.Kind, APIVersion: cs.APIVersion}} + if err := r.Get(ctx, types.NamespacedName{Name: cs.Name, Namespace: aw.Namespace}, obj); err == nil { + if obj.GetDeletionTimestamp().IsZero() { + summary.deployed += 1 + } } else { - return nil, err + if apierrors.IsNotFound(err) { + meta.SetStatusCondition(&aw.Status.ComponentStatus[componentIdx].Conditions, metav1.Condition{ + Type: string(workloadv1beta2.Unhealthy), + Status: metav1.ConditionTrue, + Reason: "ComponentNotFound", + }) + } else { + return nil, err + } } } } diff --git a/samples/wrapped-failing-pytorch-job.yaml b/samples/wrapped-failing-pytorch-job.yaml new file mode 100644 index 0000000..d7bc7a2 --- /dev/null +++ b/samples/wrapped-failing-pytorch-job.yaml @@ -0,0 +1,43 @@ +apiVersion: workload.codeflare.dev/v1beta2 +kind: AppWrapper +metadata: + name: sample-failing-pytorch-job + labels: + kueue.x-k8s.io/queue-name: user-queue +spec: + components: + - template: + apiVersion: "kubeflow.org/v1" + kind: PyTorchJob + metadata: + name: pytorch-simple + spec: + pytorchReplicaSpecs: + Master: + replicas: 1 + restartPolicy: Never + template: + spec: + containers: + - name: pytorch + image: docker.io/kubeflowkatib/pytorch-mnist-cpu:v1beta1-fc858d1 + command: + - "python3" + - "/opt/pytorch-mnist/mnist.py" + - "--epochs=1" + resources: + requests: + cpu: 1 + Worker: + replicas: 1 + restartPolicy: Never + template: + spec: + containers: + - name: pytorch + image: docker.io/kubeflowkatib/pytorch-mnist-cpu:v1beta1-fc858d1 + command: + - sleep 10; exit 1 + resources: + requests: + cpu: 1