Skip to content

Commit 20bd912

Browse files
authored
Append autopilot anti-affinities to existing matchExpressions array (#260)
Fixes #259. Fix the actual bug and also add unit tests to cover all PodSpecTemplate injections by the appwrapper controller.
1 parent 20de4e7 commit 20bd912

File tree

3 files changed

+175
-17
lines changed

3 files changed

+175
-17
lines changed

internal/controller/appwrapper/appwrapper_controller_test.go

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
. "github.com/onsi/gomega"
2424
v1 "k8s.io/api/core/v1"
2525
"k8s.io/apimachinery/pkg/api/meta"
26+
"k8s.io/apimachinery/pkg/api/resource"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728
"k8s.io/apimachinery/pkg/types"
2829
"k8s.io/client-go/tools/record"
@@ -63,6 +64,8 @@ var _ = Describe("AppWrapper Controller", func() {
6364
awConfig.FaultTolerance.RetryPausePeriod = 0 * time.Second
6465
awConfig.FaultTolerance.RetryLimit = 0
6566
awConfig.FaultTolerance.SuccessTTL = 0 * time.Second
67+
awConfig.Autopilot.ResourceTaints["nvidia.com/gpu"] = append(awConfig.Autopilot.ResourceTaints["nvidia.com/gpu"], v1.Taint{Key: "extra", Value: "test", Effect: v1.TaintEffectNoExecute})
68+
6669
awReconciler = &AppWrapperReconciler{
6770
Client: k8sClient,
6871
Recorder: &record.FakeRecorder{},
@@ -156,6 +159,42 @@ var _ = Describe("AppWrapper Controller", func() {
156159
Expect(finished).Should(BeFalse())
157160
}
158161

162+
validateMarkers := func(p *v1.Pod) {
163+
for k, v := range markerPodSet.Annotations {
164+
Expect(p.Annotations).Should(HaveKeyWithValue(k, v))
165+
}
166+
for k, v := range markerPodSet.Labels {
167+
Expect(p.Labels).Should(HaveKeyWithValue(k, v))
168+
}
169+
for _, v := range markerPodSet.Tolerations {
170+
Expect(p.Spec.Tolerations).Should(ContainElement(v))
171+
}
172+
for k, v := range markerPodSet.NodeSelector {
173+
Expect(p.Spec.NodeSelector).Should(HaveKeyWithValue(k, v))
174+
}
175+
}
176+
177+
validateAutopilot := func(p *v1.Pod) {
178+
if p.Spec.Containers[0].Resources.Requests.Name("nvidia.com/gpu", resource.DecimalSI).IsZero() {
179+
Expect(p.Spec.Affinity).Should(BeNil())
180+
} else {
181+
Expect(p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution).ShouldNot(BeNil())
182+
Expect(p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms).Should(HaveLen(1))
183+
mes := p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions
184+
for _, taint := range awReconciler.Config.Autopilot.ResourceTaints["nvidia.com/gpu"] {
185+
found := false
186+
for _, me := range mes {
187+
if me.Key == taint.Key {
188+
Expect(me.Operator).Should(Equal(v1.NodeSelectorOpNotIn))
189+
Expect(me.Values).Should(ContainElement(taint.Value))
190+
found = true
191+
}
192+
}
193+
Expect(found).Should(BeTrue())
194+
}
195+
}
196+
}
197+
159198
AfterEach(func() {
160199
By("Cleanup the AppWrapper and ensure no Pods remain")
161200
aw := &workloadv1beta2.AppWrapper{}
@@ -318,6 +357,54 @@ var _ = Describe("AppWrapper Controller", func() {
318357
Expect(err).NotTo(HaveOccurred())
319358
Expect(podStatus.pending).Should(Equal(int32(1)))
320359
})
360+
361+
It("Validating PodSet Injection invariants on minimal pods", func() {
362+
advanceToResuming(pod(100, 0, false), pod(100, 1, true))
363+
beginRunning()
364+
aw := getAppWrapper(awName)
365+
pods := getPods(aw)
366+
Expect(pods).Should(HaveLen(2))
367+
368+
By("Validate expected markers and Autopilot anti-affinities were injected")
369+
for _, p := range pods {
370+
Expect(p.Labels).Should(HaveKeyWithValue(AppWrapperLabel, awName.Name))
371+
validateMarkers(&p)
372+
validateAutopilot(&p)
373+
}
374+
})
375+
376+
It("Validating PodSet Injection invariants on complex pods", func() {
377+
advanceToResuming(complexPodYaml(), complexPodYaml())
378+
beginRunning()
379+
aw := getAppWrapper(awName)
380+
pods := getPods(aw)
381+
Expect(pods).Should(HaveLen(2))
382+
383+
By("Validate expected markers and Autopilot anti-affinities were injected")
384+
for _, p := range pods {
385+
Expect(p.Labels).Should(HaveKeyWithValue(AppWrapperLabel, awName.Name))
386+
validateMarkers(&p)
387+
validateAutopilot(&p)
388+
}
389+
390+
By("Validate complex pod elements were not removed")
391+
for _, p := range pods {
392+
Expect(p.Labels).Should(HaveKeyWithValue("myComplexLabel", "myComplexValue"))
393+
Expect(p.Annotations).Should(HaveKeyWithValue("myComplexAnnotation", "myComplexValue"))
394+
Expect(p.Spec.NodeSelector).Should(HaveKeyWithValue("myComplexSelector", "myComplexValue"))
395+
Expect(p.Spec.Tolerations).Should(ContainElement(v1.Toleration{Key: "myComplexKey", Value: "myComplexValue", Operator: v1.TolerationOpEqual, Effect: v1.TaintEffectNoSchedule}))
396+
mes := p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions
397+
found := false
398+
for _, me := range mes {
399+
if me.Key == "kubernetes.io/hostname" {
400+
Expect(me.Operator).Should(Equal(v1.NodeSelectorOpNotIn))
401+
Expect(me.Values).Should(ContainElement("badHost1"))
402+
found = true
403+
}
404+
}
405+
Expect(found).Should(BeTrue())
406+
}
407+
})
321408
})
322409

323410
var _ = Describe("AppWrapper Annotations", func() {
@@ -433,5 +520,4 @@ var _ = Describe("AppWrapper Annotations", func() {
433520
Expect(awReconciler.terminalExitCodes(ctx, aw)).Should(Equal([]int{3, 10, 42}))
434521
Expect(awReconciler.retryableExitCodes(ctx, aw)).Should(Equal([]int{10, 20}))
435522
})
436-
437523
})

internal/controller/appwrapper/fixtures_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,19 @@ func getNode(name string) *v1.Node {
6969
return node
7070
}
7171

72+
func getPods(aw *workloadv1beta2.AppWrapper) []v1.Pod {
73+
result := []v1.Pod{}
74+
podList := &v1.PodList{}
75+
err := k8sClient.List(ctx, podList, &client.ListOptions{Namespace: aw.Namespace})
76+
Expect(err).NotTo(HaveOccurred())
77+
for _, pod := range podList.Items {
78+
if awn, found := pod.Labels[AppWrapperLabel]; found && awn == aw.Name {
79+
result = append(result, pod)
80+
}
81+
}
82+
return result
83+
}
84+
7285
// envTest doesn't have a Pod controller; so simulate it
7386
func setPodStatus(aw *workloadv1beta2.AppWrapper, phase v1.PodPhase, numToChange int32) error {
7487
podList := &v1.PodList{}
@@ -128,6 +141,54 @@ func pod(milliCPU int64, numGPU int64, declarePodSets bool) workloadv1beta2.AppW
128141
return *awc
129142
}
130143

144+
const complexPodYAML = `
145+
apiVersion: v1
146+
kind: Pod
147+
metadata:
148+
name: %v
149+
labels:
150+
myComplexLabel: myComplexValue
151+
annotations:
152+
myComplexAnnotation: myComplexValue
153+
spec:
154+
restartPolicy: Never
155+
nodeSelector:
156+
myComplexSelector: myComplexValue
157+
affinity:
158+
nodeAffinity:
159+
requiredDuringSchedulingIgnoredDuringExecution:
160+
nodeSelectorTerms:
161+
- matchExpressions:
162+
- key: kubernetes.io/hostname
163+
operator: NotIn
164+
values:
165+
- badHost1
166+
tolerations:
167+
- key: myComplexKey
168+
value: myComplexValue
169+
operator: Equal
170+
effect: NoSchedule
171+
containers:
172+
- name: busybox
173+
image: quay.io/project-codeflare/busybox:1.36
174+
command: ["sh", "-c", "sleep 10"]
175+
resources:
176+
requests:
177+
cpu: 100m
178+
nvidia.com/gpu: 1
179+
limits:
180+
nvidia.com/gpu: 1`
181+
182+
func complexPodYaml() workloadv1beta2.AppWrapperComponent {
183+
yamlString := fmt.Sprintf(complexPodYAML, randName("pod"))
184+
jsonBytes, err := yaml.YAMLToJSON([]byte(yamlString))
185+
Expect(err).NotTo(HaveOccurred())
186+
awc := &workloadv1beta2.AppWrapperComponent{
187+
Template: runtime.RawExtension{Raw: jsonBytes},
188+
}
189+
return *awc
190+
}
191+
131192
const malformedPodYAML = `
132193
apiVersion: v1
133194
kind: Pod

internal/controller/appwrapper/resource_management.go

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func hasResourceRequest(spec map[string]interface{}, resource string) bool {
108108
return false
109109
}
110110

111-
func addNodeSelectorsToAffinity(spec map[string]interface{}, selectorTerms []v1.NodeSelectorTerm) error {
111+
func addNodeSelectorsToAffinity(spec map[string]interface{}, exprsToAdd []v1.NodeSelectorRequirement) error {
112112
if _, ok := spec["affinity"]; !ok {
113113
spec["affinity"] = map[string]interface{}{}
114114
}
@@ -131,24 +131,37 @@ func addNodeSelectorsToAffinity(spec map[string]interface{}, selectorTerms []v1.
131131
return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution is not a map")
132132
}
133133
if _, ok := nodeSelector["nodeSelectorTerms"]; !ok {
134-
nodeSelector["nodeSelectorTerms"] = []interface{}{}
134+
nodeSelector["nodeSelectorTerms"] = []interface{}{map[string]interface{}{}}
135135
}
136136
existingTerms, ok := nodeSelector["nodeSelectorTerms"].([]interface{})
137137
if !ok {
138138
return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms is not an array")
139139
}
140-
for _, termToAdd := range selectorTerms {
141-
bytes, err := json.Marshal(termToAdd)
142-
if err != nil {
143-
return fmt.Errorf("marshalling selectorTerm %v: %w", termToAdd, err)
140+
for idx, term := range existingTerms {
141+
selTerm, ok := term.(map[string]interface{})
142+
if !ok {
143+
return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms[%v] is not an map", idx)
144144
}
145-
var obj interface{}
146-
if err = json.Unmarshal(bytes, &obj); err != nil {
147-
return fmt.Errorf("unmarshalling selectorTerm %v: %w", termToAdd, err)
145+
if _, ok := selTerm["matchExpressions"]; !ok {
146+
selTerm["matchExpressions"] = []interface{}{}
147+
}
148+
matchExpressions, ok := selTerm["matchExpressions"].([]interface{})
149+
if !ok {
150+
return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms[%v].matchExpressions is not an map", idx)
151+
}
152+
for _, expr := range exprsToAdd {
153+
bytes, err := json.Marshal(expr)
154+
if err != nil {
155+
return fmt.Errorf("marshalling selectorTerm %v: %w", expr, err)
156+
}
157+
var obj interface{}
158+
if err = json.Unmarshal(bytes, &obj); err != nil {
159+
return fmt.Errorf("unmarshalling selectorTerm %v: %w", expr, err)
160+
}
161+
matchExpressions = append(matchExpressions, obj)
148162
}
149-
existingTerms = append(existingTerms, obj)
163+
selTerm["matchExpressions"] = matchExpressions
150164
}
151-
nodeSelector["nodeSelectorTerms"] = existingTerms
152165

153166
return nil
154167
}
@@ -262,13 +275,11 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
262275
}
263276
}
264277
if len(toAdd) > 0 {
265-
nodeSelectors := []v1.NodeSelectorTerm{}
278+
matchExpressions := []v1.NodeSelectorRequirement{}
266279
for k, v := range toAdd {
267-
nodeSelectors = append(nodeSelectors, v1.NodeSelectorTerm{
268-
MatchExpressions: []v1.NodeSelectorRequirement{{Operator: v1.NodeSelectorOpNotIn, Key: k, Values: v}},
269-
})
280+
matchExpressions = append(matchExpressions, v1.NodeSelectorRequirement{Operator: v1.NodeSelectorOpNotIn, Key: k, Values: v})
270281
}
271-
if err := addNodeSelectorsToAffinity(spec, nodeSelectors); err != nil {
282+
if err := addNodeSelectorsToAffinity(spec, matchExpressions); err != nil {
272283
log.FromContext(ctx).Error(err, "failed to inject Autopilot affinities")
273284
}
274285
}

0 commit comments

Comments
 (0)