Skip to content

Ensure podAnnotations are removed from pods if reset in the config #2826

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 40 additions & 24 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,17 @@ type Cluster struct {
}

type compareStatefulsetResult struct {
match bool
replace bool
rollingUpdate bool
reasons []string
match bool
replace bool
rollingUpdate bool
reasons []string
deletedPodAnnotations []string
}

type compareLogicalBackupJobResult struct {
match bool
reasons []string
deletedPodAnnotations []string
}

// New creates a new cluster. This function should be called from a controller.
Expand Down Expand Up @@ -431,6 +438,7 @@ func (c *Cluster) Create() (err error) {
}

func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compareStatefulsetResult {
deletedPodAnnotations := []string{}
reasons := make([]string, 0)
var match, needsRollUpdate, needsReplace bool

Expand All @@ -445,7 +453,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
needsReplace = true
reasons = append(reasons, "new statefulset's ownerReferences do not match")
}
if changed, reason := c.compareAnnotations(c.Statefulset.Annotations, statefulSet.Annotations); changed {
if changed, reason := c.compareAnnotations(c.Statefulset.Annotations, statefulSet.Annotations, nil); changed {
match = false
needsReplace = true
reasons = append(reasons, "new statefulset's annotations do not match: "+reason)
Expand Down Expand Up @@ -519,7 +527,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
}
}

if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations); changed {
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations, &deletedPodAnnotations); changed {
match = false
needsReplace = true
reasons = append(reasons, "new statefulset's pod template metadata annotations does not match "+reason)
Expand All @@ -541,7 +549,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d does not match the current one", i))
continue
}
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations); changed {
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations, nil); changed {
needsReplace = true
reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q do not match the current ones: %s", name, reason))
}
Expand Down Expand Up @@ -579,7 +587,7 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
match = false
}

return &compareStatefulsetResult{match: match, reasons: reasons, rollingUpdate: needsRollUpdate, replace: needsReplace}
return &compareStatefulsetResult{match: match, reasons: reasons, rollingUpdate: needsRollUpdate, replace: needsReplace, deletedPodAnnotations: deletedPodAnnotations}
}

type containerCondition func(a, b v1.Container) bool
Expand Down Expand Up @@ -781,7 +789,7 @@ func volumeMountExists(mount v1.VolumeMount, mounts []v1.VolumeMount) bool {
return false
}

func (c *Cluster) compareAnnotations(old, new map[string]string) (bool, string) {
func (c *Cluster) compareAnnotations(old, new map[string]string, removedList *[]string) (bool, string) {
reason := ""
ignoredAnnotations := make(map[string]bool)
for _, ignore := range c.OpConfig.IgnoredAnnotations {
Expand All @@ -794,6 +802,9 @@ func (c *Cluster) compareAnnotations(old, new map[string]string) (bool, string)
}
if _, ok := new[key]; !ok {
reason += fmt.Sprintf(" Removed %q.", key)
if removedList != nil {
*removedList = append(*removedList, key)
}
}
}

Expand Down Expand Up @@ -836,41 +847,46 @@ func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
return true, ""
}

func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool, reason string) {
func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) *compareLogicalBackupJobResult {
deletedPodAnnotations := []string{}
reasons := make([]string, 0)
match := true

if cur.Spec.Schedule != new.Spec.Schedule {
return false, fmt.Sprintf("new job's schedule %q does not match the current one %q",
new.Spec.Schedule, cur.Spec.Schedule)
match = false
reasons = append(reasons, fmt.Sprintf("new job's schedule %q does not match the current one %q", new.Spec.Schedule, cur.Spec.Schedule))
}

newImage := new.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image
curImage := cur.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image
if newImage != curImage {
return false, fmt.Sprintf("new job's image %q does not match the current one %q",
newImage, curImage)
match = false
reasons = append(reasons, fmt.Sprintf("new job's image %q does not match the current one %q", newImage, curImage))
}

newPodAnnotation := new.Spec.JobTemplate.Spec.Template.Annotations
curPodAnnotation := cur.Spec.JobTemplate.Spec.Template.Annotations
if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation); changed {
return false, fmt.Sprintf("new job's pod template metadata annotations does not match " + reason)
if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation, &deletedPodAnnotations); changed {
match = false
reasons = append(reasons, fmt.Sprint("new job's pod template metadata annotations do not match "+reason))
}

newPgVersion := getPgVersion(new)
curPgVersion := getPgVersion(cur)
if newPgVersion != curPgVersion {
return false, fmt.Sprintf("new job's env PG_VERSION %q does not match the current one %q",
newPgVersion, curPgVersion)
match = false
reasons = append(reasons, fmt.Sprintf("new job's env PG_VERSION %q does not match the current one %q", newPgVersion, curPgVersion))
}

needsReplace := false
reasons := make([]string, 0)
needsReplace, reasons = c.compareContainers("cronjob container", cur.Spec.JobTemplate.Spec.Template.Spec.Containers, new.Spec.JobTemplate.Spec.Template.Spec.Containers, needsReplace, reasons)
contReasons := make([]string, 0)
needsReplace, contReasons = c.compareContainers("cronjob container", cur.Spec.JobTemplate.Spec.Template.Spec.Containers, new.Spec.JobTemplate.Spec.Template.Spec.Containers, needsReplace, contReasons)
if needsReplace {
return false, fmt.Sprintf("logical backup container specs do not match: %v", strings.Join(reasons, `', '`))
match = false
reasons = append(reasons, fmt.Sprintf("logical backup container specs do not match: %v", strings.Join(contReasons, `', '`)))
}

return true, ""
return &compareLogicalBackupJobResult{match: match, reasons: reasons, deletedPodAnnotations: deletedPodAnnotations}
}

func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBudget) (bool, string) {
Expand All @@ -881,7 +897,7 @@ func (c *Cluster) comparePodDisruptionBudget(cur, new *policyv1.PodDisruptionBud
if !reflect.DeepEqual(new.ObjectMeta.OwnerReferences, cur.ObjectMeta.OwnerReferences) {
return false, "new PDB's owner references do not match the current ones"
}
if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations); changed {
if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations, nil); changed {
return false, "new PDB's annotations do not match the current ones:" + reason
}
return true, ""
Expand Down Expand Up @@ -1016,7 +1032,7 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
// only when streams were not specified in oldSpec but in newSpec
needStreamUser := len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0

annotationsChanged, _ := c.compareAnnotations(oldSpec.Annotations, newSpec.Annotations)
annotationsChanged, _ := c.compareAnnotations(oldSpec.Annotations, newSpec.Annotations, nil)

initUsers := !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser
if initUsers {
Expand Down
20 changes: 14 additions & 6 deletions pkg/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1680,12 +1680,20 @@ func TestCompareLogicalBackupJob(t *testing.T) {
}
}

match, reason := cluster.compareLogicalBackupJob(currentCronJob, desiredCronJob)
if match != tt.match {
t.Errorf("%s - unexpected match result %t when comparing cronjobs %#v and %#v", t.Name(), match, currentCronJob, desiredCronJob)
} else {
if !strings.HasPrefix(reason, tt.reason) {
t.Errorf("%s - expected reason prefix %s, found %s", t.Name(), tt.reason, reason)
cmp := cluster.compareLogicalBackupJob(currentCronJob, desiredCronJob)
if cmp.match != tt.match {
t.Errorf("%s - unexpected match result %t when comparing cronjobs %#v and %#v", t.Name(), cmp.match, currentCronJob, desiredCronJob)
} else if !cmp.match {
found := false
for _, reason := range cmp.reasons {
if strings.HasPrefix(reason, tt.reason) {
found = true
break
}
found = false
}
if !found {
t.Errorf("%s - expected reason prefix %s, not found in %#v", t.Name(), tt.reason, cmp.reasons)
}
}
})
Expand Down
40 changes: 33 additions & 7 deletions pkg/cluster/connection_pooler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cluster

import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -977,6 +978,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
err error
)

updatedPodAnnotations := map[string]*string{}
syncReason := make([]string, 0)
deployment, err = c.KubeClient.
Deployments(c.Namespace).
Expand Down Expand Up @@ -1038,9 +1040,27 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
}

newPodAnnotations := c.annotationsSet(c.generatePodAnnotations(&c.Spec))
if changed, reason := c.compareAnnotations(deployment.Spec.Template.Annotations, newPodAnnotations); changed {
deletedPodAnnotations := []string{}
if changed, reason := c.compareAnnotations(deployment.Spec.Template.Annotations, newPodAnnotations, &deletedPodAnnotations); changed {
specSync = true
syncReason = append(syncReason, []string{"new connection pooler's pod template annotations do not match the current ones: " + reason}...)

for _, anno := range deletedPodAnnotations {
updatedPodAnnotations[anno] = nil
}
templateMetadataReq := map[string]map[string]map[string]map[string]map[string]*string{
"spec": {"template": {"metadata": {"annotations": updatedPodAnnotations}}}}
patch, err := json.Marshal(templateMetadataReq)
if err != nil {
return nil, fmt.Errorf("could not marshal ObjectMeta for %s connection pooler's pod template: %v", role, err)
}
deployment, err = c.KubeClient.Deployments(c.Namespace).Patch(context.TODO(),
deployment.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "")
if err != nil {
c.logger.Errorf("failed to patch %s connection pooler's pod template: %v", role, err)
return nil, err
}

deployment.Spec.Template.Annotations = newPodAnnotations
}

Expand All @@ -1064,7 +1084,7 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
}

newAnnotations := c.AnnotationsToPropagate(c.annotationsSet(nil)) // including the downscaling annotations
if changed, _ := c.compareAnnotations(deployment.Annotations, newAnnotations); changed {
if changed, _ := c.compareAnnotations(deployment.Annotations, newAnnotations, nil); changed {
deployment, err = patchConnectionPoolerAnnotations(c.KubeClient, deployment, newAnnotations)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1098,14 +1118,20 @@ func (c *Cluster) syncConnectionPoolerWorker(oldSpec, newSpec *acidv1.Postgresql
if err != nil {
return nil, fmt.Errorf("could not delete pooler pod: %v", err)
}
} else if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations); changed {
patchData, err := metaAnnotationsPatch(deployment.Spec.Template.Annotations)
} else if changed, _ := c.compareAnnotations(pod.Annotations, deployment.Spec.Template.Annotations, nil); changed {
metadataReq := map[string]map[string]map[string]*string{"metadata": {}}

for anno, val := range deployment.Spec.Template.Annotations {
updatedPodAnnotations[anno] = &val
}
metadataReq["metadata"]["annotations"] = updatedPodAnnotations
patch, err := json.Marshal(metadataReq)
if err != nil {
return nil, fmt.Errorf("could not form patch for pooler's pod annotations: %v", err)
return nil, fmt.Errorf("could not marshal ObjectMeta for %s connection pooler's pods: %v", role, err)
}
_, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
_, err = c.KubeClient.Pods(pod.Namespace).Patch(context.TODO(), pod.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
if err != nil {
return nil, fmt.Errorf("could not patch annotations for pooler's pod %q: %v", pod.Name, err)
return nil, fmt.Errorf("could not patch annotations for %s connection pooler's pod %q: %v", role, pod.Name, err)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (c *Cluster) updateService(role PostgresRole, oldService *v1.Service, newSe
}
}

if changed, _ := c.compareAnnotations(oldService.Annotations, newService.Annotations); changed {
if changed, _ := c.compareAnnotations(oldService.Annotations, newService.Annotations, nil); changed {
patchData, err := metaAnnotationsPatch(newService.Annotations)
if err != nil {
return nil, fmt.Errorf("could not form patch for service %q annotations: %v", oldService.Name, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func (c *Cluster) compareStreams(curEventStreams, newEventStreams *zalandov1.Fab
for newKey, newValue := range newEventStreams.Annotations {
desiredAnnotations[newKey] = newValue
}
if changed, reason := c.compareAnnotations(curEventStreams.ObjectMeta.Annotations, desiredAnnotations); changed {
if changed, reason := c.compareAnnotations(curEventStreams.ObjectMeta.Annotations, desiredAnnotations, nil); changed {
match = false
reasons = append(reasons, fmt.Sprintf("new streams annotations do not match: %s", reason))
}
Expand Down
Loading
Loading