Skip to content

Commit 7ec3c3e

Browse files
authored
Recognize failed PyTorchJobs and skip pod-level gracePeriod (#163)
1 parent e4b6aca commit 7ec3c3e

File tree

2 files changed

+115
-12
lines changed

2 files changed

+115
-12
lines changed

internal/controller/appwrapper/appwrapper_controller.go

Lines changed: 72 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
apierrors "k8s.io/apimachinery/pkg/api/errors"
2727
"k8s.io/apimachinery/pkg/api/meta"
2828
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2930
"k8s.io/apimachinery/pkg/runtime"
3031
"k8s.io/apimachinery/pkg/types"
3132

@@ -66,6 +67,7 @@ type podStatusSummary struct {
6667
type componentStatusSummary struct {
6768
expected int32
6869
deployed int32
70+
failed int32
6971
}
7072

7173
// permission to fully control appwrappers
@@ -240,6 +242,18 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
240242
return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperFailed)
241243
}
242244

245+
// If a component's controller has put it into a failed state, we do not need
246+
// to allow any further grace period. The situation will not self-correct.
247+
if compStatus.failed > 0 {
248+
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
249+
Type: string(workloadv1beta2.Unhealthy),
250+
Status: metav1.ConditionTrue,
251+
Reason: "FailedComponent",
252+
Message: fmt.Sprintf("Found %v failed components", compStatus.failed),
253+
})
254+
return r.resetOrFail(ctx, aw)
255+
}
256+
243257
// Second, check the Pod-level status of the workload
244258
podStatus, err := r.getPodStatus(ctx, aw)
245259
if err != nil {
@@ -501,20 +515,66 @@ func (r *AppWrapperReconciler) getComponentStatus(ctx context.Context, aw *workl
501515

502516
for componentIdx := range aw.Status.ComponentStatus {
503517
cs := &aw.Status.ComponentStatus[componentIdx]
504-
obj := &metav1.PartialObjectMetadata{TypeMeta: metav1.TypeMeta{Kind: cs.Kind, APIVersion: cs.APIVersion}}
505-
if err := r.Get(ctx, types.NamespacedName{Name: cs.Name, Namespace: aw.Namespace}, obj); err == nil {
506-
if obj.DeletionTimestamp.IsZero() {
507-
summary.deployed += 1
518+
switch cs.Kind {
519+
case "PyTorchJob":
520+
obj := &unstructured.Unstructured{}
521+
obj.SetAPIVersion(cs.APIVersion)
522+
obj.SetKind(cs.Kind)
523+
if err := r.Get(ctx, types.NamespacedName{Name: cs.Name, Namespace: aw.Namespace}, obj); err == nil {
524+
if obj.GetDeletionTimestamp().IsZero() {
525+
summary.deployed += 1
526+
527+
// PyTorchJob is failed if status.Conditions contains an entry with type "Failed" and status "True"
528+
status, ok := obj.UnstructuredContent()["status"]
529+
if !ok {
530+
continue
531+
}
532+
cond, ok := status.(map[string]interface{})["conditions"]
533+
if !ok {
534+
continue
535+
}
536+
condArray, ok := cond.([]interface{})
537+
if !ok {
538+
continue
539+
}
540+
for _, aCond := range condArray {
541+
if condMap, ok := aCond.(map[string]interface{}); ok {
542+
if condType, ok := condMap["type"]; ok && condType.(string) == "Failed" {
543+
if status, ok := condMap["status"]; ok && status.(string) == "True" {
544+
summary.failed += 1
545+
}
546+
}
547+
}
548+
}
549+
}
550+
} else {
551+
if apierrors.IsNotFound(err) {
552+
meta.SetStatusCondition(&aw.Status.ComponentStatus[componentIdx].Conditions, metav1.Condition{
553+
Type: string(workloadv1beta2.Unhealthy),
554+
Status: metav1.ConditionTrue,
555+
Reason: "ComponentNotFound",
556+
})
557+
} else {
558+
return nil, err
559+
}
508560
}
509-
} else {
510-
if apierrors.IsNotFound(err) {
511-
meta.SetStatusCondition(&aw.Status.ComponentStatus[componentIdx].Conditions, metav1.Condition{
512-
Type: string(workloadv1beta2.Unhealthy),
513-
Status: metav1.ConditionTrue,
514-
Reason: "ComponentNotFound",
515-
})
561+
562+
default:
563+
obj := &metav1.PartialObjectMetadata{TypeMeta: metav1.TypeMeta{Kind: cs.Kind, APIVersion: cs.APIVersion}}
564+
if err := r.Get(ctx, types.NamespacedName{Name: cs.Name, Namespace: aw.Namespace}, obj); err == nil {
565+
if obj.GetDeletionTimestamp().IsZero() {
566+
summary.deployed += 1
567+
}
516568
} else {
517-
return nil, err
569+
if apierrors.IsNotFound(err) {
570+
meta.SetStatusCondition(&aw.Status.ComponentStatus[componentIdx].Conditions, metav1.Condition{
571+
Type: string(workloadv1beta2.Unhealthy),
572+
Status: metav1.ConditionTrue,
573+
Reason: "ComponentNotFound",
574+
})
575+
} else {
576+
return nil, err
577+
}
518578
}
519579
}
520580
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
apiVersion: workload.codeflare.dev/v1beta2
2+
kind: AppWrapper
3+
metadata:
4+
name: sample-failing-pytorch-job
5+
labels:
6+
kueue.x-k8s.io/queue-name: user-queue
7+
spec:
8+
components:
9+
- template:
10+
apiVersion: "kubeflow.org/v1"
11+
kind: PyTorchJob
12+
metadata:
13+
name: pytorch-simple
14+
spec:
15+
pytorchReplicaSpecs:
16+
Master:
17+
replicas: 1
18+
restartPolicy: Never
19+
template:
20+
spec:
21+
containers:
22+
- name: pytorch
23+
image: docker.io/kubeflowkatib/pytorch-mnist-cpu:v1beta1-fc858d1
24+
command:
25+
- "python3"
26+
- "/opt/pytorch-mnist/mnist.py"
27+
- "--epochs=1"
28+
resources:
29+
requests:
30+
cpu: 1
31+
Worker:
32+
replicas: 1
33+
restartPolicy: Never
34+
template:
35+
spec:
36+
containers:
37+
- name: pytorch
38+
image: docker.io/kubeflowkatib/pytorch-mnist-cpu:v1beta1-fc858d1
39+
command:
40+
- sleep 10; exit 1
41+
resources:
42+
requests:
43+
cpu: 1

0 commit comments

Comments
 (0)