Skip to content

Commit 851eccd

Browse files
committed
Manager should support retrying to start runnables with backoff
This changeset adds the ability for a Manager to not fail immediately if a wait.Backoff parameter is given as RunnableRetryBackoff in Options. Currently, if a runnable fails to run the Start operation is never retried which could cause the manager and all webhooks to stop and the deployment to go into CrashLoopBackoff. Given the eventual consistency of controllers and managers cooperating with other controllers or the api-server, allow some sort of backoff by trying to start runnables a number of times before giving up. Signed-off-by: Vince Prignano <vincepri@vmware.com>
1 parent 0cce21b commit 851eccd

File tree

3 files changed

+98
-1
lines changed

3 files changed

+98
-1
lines changed

pkg/manager/internal.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"k8s.io/apimachinery/pkg/api/meta"
3131
"k8s.io/apimachinery/pkg/runtime"
3232
kerrors "k8s.io/apimachinery/pkg/util/errors"
33+
"k8s.io/apimachinery/pkg/util/wait"
3334
"k8s.io/client-go/rest"
3435
"k8s.io/client-go/tools/leaderelection"
3536
"k8s.io/client-go/tools/leaderelection/resourcelock"
@@ -160,6 +161,10 @@ type controllerManager struct {
160161
// between tries of actions.
161162
retryPeriod time.Duration
162163

164+
// runnableRetryBackoff, if set, instructs the manager to retry to start runnables
165+
// if an error occurs and only fail after a certain amount of time.
166+
runnableRetryBackoff *wait.Backoff
167+
163168
// waitForRunnable is holding the number of runnables currently running so that
164169
// we can wait for them to exit before quitting the manager
165170
waitForRunnable sync.WaitGroup
@@ -693,7 +698,29 @@ func (cm *controllerManager) startRunnable(r Runnable) {
693698
cm.waitForRunnable.Add(1)
694699
go func() {
695700
defer cm.waitForRunnable.Done()
696-
if err := r.Start(cm.internalCtx); err != nil {
701+
702+
// If there is no retry backoff, keep old logic to return right away.
703+
if cm.runnableRetryBackoff == nil {
704+
if err := r.Start(cm.internalCtx); err != nil {
705+
cm.errChan <- err
706+
}
707+
return
708+
}
709+
710+
// If we should wait and run into exponential backoff, call Start multiple
711+
// times until it either suceeds, or the backoff expires.
712+
var lastError error
713+
if err := wait.ExponentialBackoffWithContext(cm.internalCtx, *cm.runnableRetryBackoff, func() (bool, error) {
714+
if err := r.Start(cm.internalCtx); err != nil {
715+
lastError = err
716+
return false, nil
717+
}
718+
return true, nil
719+
}); err != nil {
720+
if lastError != nil {
721+
cm.errChan <- fmt.Errorf("failed to run runnable, %s: %w", err, lastError)
722+
return
723+
}
697724
cm.errChan <- err
698725
}
699726
}()

pkg/manager/manager.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"k8s.io/apimachinery/pkg/api/meta"
2929
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3030
"k8s.io/apimachinery/pkg/runtime"
31+
"k8s.io/apimachinery/pkg/util/wait"
3132
"k8s.io/client-go/rest"
3233
"k8s.io/client-go/tools/leaderelection/resourcelock"
3334
"k8s.io/client-go/tools/record"
@@ -264,6 +265,10 @@ type Options struct {
264265
// +optional
265266
Controller v1alpha1.ControllerConfigurationSpec
266267

268+
// RunnableRetryBackoff, if set, instructs the manager to retry to start runnables
269+
// if an error occurs and only fail after a certain amount of time.
270+
RunnableRetryBackoff *wait.Backoff
271+
267272
// makeBroadcaster allows deferring the creation of the broadcaster to
268273
// avoid leaking goroutines if we never call Start on this manager. It also
269274
// returns whether or not this is a "owned" broadcaster, and as such should be
@@ -388,6 +393,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
388393
internalProceduresStop: make(chan struct{}),
389394
leaderElectionStopped: make(chan struct{}),
390395
leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,
396+
runnableRetryBackoff: options.RunnableRetryBackoff,
391397
}, nil
392398
}
393399

pkg/manager/manager_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"k8s.io/apimachinery/pkg/api/meta"
3838
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3939
"k8s.io/apimachinery/pkg/runtime"
40+
"k8s.io/apimachinery/pkg/util/wait"
4041
"k8s.io/client-go/rest"
4142
"k8s.io/client-go/tools/leaderelection/resourcelock"
4243
configv1alpha1 "k8s.io/component-base/config/v1alpha1"
@@ -999,6 +1000,69 @@ var _ = Describe("manger.Manager", func() {
9991000
<-runnableStopped
10001001
})
10011002

1003+
It("should wait for runnables if exponential backoff is set", func() {
1004+
m, err := New(cfg, options)
1005+
Expect(err).NotTo(HaveOccurred())
1006+
for _, cb := range callbacks {
1007+
cb(m)
1008+
}
1009+
m.(*controllerManager).runnableRetryBackoff = &wait.Backoff{
1010+
Duration: 10 * time.Millisecond,
1011+
Steps: 5,
1012+
Jitter: 1.0,
1013+
}
1014+
1015+
called := 0
1016+
runnableStopped := make(chan struct{})
1017+
now := time.Now()
1018+
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1019+
called++
1020+
if time.Now().Sub(now).Milliseconds() > 30 {
1021+
close(runnableStopped)
1022+
return nil
1023+
}
1024+
return errors.New("not yet")
1025+
}))).ToNot(HaveOccurred())
1026+
1027+
ctx, cancel := context.WithCancel(context.Background())
1028+
managerStopDone := make(chan struct{})
1029+
go func() {
1030+
Expect(m.Start(ctx)).NotTo(HaveOccurred())
1031+
close(managerStopDone)
1032+
}()
1033+
<-runnableStopped
1034+
<-m.(*controllerManager).elected
1035+
cancel()
1036+
1037+
Expect(called).To(BeNumerically(">=", 1))
1038+
})
1039+
1040+
It("should error when if a runnable takes too long to run and backoff is enabled", func() {
1041+
m, err := New(cfg, options)
1042+
Expect(err).NotTo(HaveOccurred())
1043+
for _, cb := range callbacks {
1044+
cb(m)
1045+
}
1046+
m.(*controllerManager).runnableRetryBackoff = &wait.Backoff{
1047+
Duration: 10 * time.Millisecond,
1048+
Steps: 5,
1049+
Jitter: 1.0,
1050+
}
1051+
1052+
now := time.Now()
1053+
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1054+
if time.Now().Sub(now).Milliseconds() > 100 {
1055+
return nil
1056+
}
1057+
return errors.New("not yet")
1058+
}))).ToNot(HaveOccurred())
1059+
1060+
ctx, cancel := context.WithCancel(context.Background())
1061+
err = m.Start(ctx)
1062+
Expect(err).To(HaveOccurred())
1063+
Expect(err.Error()).To(ContainSubstring("not yet"))
1064+
cancel()
1065+
})
10021066
}
10031067

10041068
Context("with defaults", func() {

0 commit comments

Comments
 (0)