Skip to content

Commit 33b61fa

Browse files
committed
Restructure to use channels instead of a pull model
Signed-off-by: Vince Prignano <vincepri@vmware.com>
1 parent cc41b65 commit 33b61fa

File tree

4 files changed

+88
-146
lines changed

4 files changed

+88
-146
lines changed

pkg/manager/internal.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func (cm *controllerManager) add(r Runnable) error {
189189
if err := cm.SetFields(r); err != nil {
190190
return err
191191
}
192-
return cm.runnables.Add(r, nil)
192+
return cm.runnables.Add(r)
193193
}
194194

195195
// Deprecated: use the equivalent Options field to set a field. This method will be removed in v0.10.
@@ -456,21 +456,21 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
456456
// WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition
457457
// between conversion webhooks and the cache sync (usually initial list) which causes the webhooks
458458
// to never start because no cache can be populated.
459-
if err := cm.runnables.Webhooks.StartAndWaitReady(cm.internalCtx); err != nil {
459+
if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
460460
if err != wait.ErrWaitTimeout {
461461
return err
462462
}
463463
}
464464

465465
// Start and wait for caches.
466-
if err := cm.runnables.Caches.StartAndWaitReady(cm.internalCtx); err != nil {
466+
if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
467467
if err != wait.ErrWaitTimeout {
468468
return err
469469
}
470470
}
471471

472472
// Start the non-leaderelection Runnables after the cache has synced.
473-
if err := cm.runnables.Others.StartAndWaitReady(cm.internalCtx); err != nil {
473+
if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
474474
if err != wait.ErrWaitTimeout {
475475
return err
476476
}
@@ -601,7 +601,7 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
601601
}
602602

603603
func (cm *controllerManager) startLeaderElectionRunnables() error {
604-
return cm.runnables.LeaderElection.StartAndWaitReady(cm.internalCtx)
604+
return cm.runnables.LeaderElection.Start(cm.internalCtx)
605605
}
606606

607607
func (cm *controllerManager) startLeaderElection(ctx context.Context) (err error) {

pkg/manager/manager_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1817,9 +1817,6 @@ func (c *startSignalingInformer) Start(ctx context.Context) error {
18171817

18181818
func (c *startSignalingInformer) WaitForCacheSync(ctx context.Context) bool {
18191819
defer func() {
1820-
for !c.started() {
1821-
continue
1822-
}
18231820
c.mu.Lock()
18241821
c.wasSynced = true
18251822
c.mu.Unlock()

pkg/manager/runnable_group.go

Lines changed: 69 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ import (
44
"context"
55
"errors"
66
"sync"
7-
"time"
87

9-
"k8s.io/apimachinery/pkg/util/wait"
108
"sigs.k8s.io/controller-runtime/pkg/webhook"
119
)
1210

@@ -18,8 +16,8 @@ var (
1816
// a ready check.
1917
type readyRunnable struct {
2018
Runnable
21-
Check runnableCheck
22-
Ready bool
19+
Check runnableCheck
20+
signalReady bool
2321
}
2422

2523
// runnableCheck can be passed to Add() to let the runnable group determine that a
@@ -46,33 +44,27 @@ func newRunnables(errChan chan error) *runnables {
4644
}
4745
}
4846

49-
// Add adds a runnable and its ready check to the closest
50-
// group of runnable that they belong to.
47+
// Add adds a runnable to closest group of runnable that they belong to.
5148
//
5249
// Add should be able to be called before and after Start, but not after StopAndWait.
5350
// Add should return an error when called during StopAndWait.
5451
// The runnables added before Start are started when Start is called.
5552
// The runnables added after Start are started directly.
56-
func (r *runnables) Add(fn Runnable, ready runnableCheck) error {
57-
if ready == nil {
58-
// If we don't have a readiness check, always return true.
59-
ready = func(_ context.Context) bool { return true }
60-
}
61-
53+
func (r *runnables) Add(fn Runnable) error {
6254
switch runnable := fn.(type) {
6355
case hasCache:
6456
return r.Caches.Add(fn, func(ctx context.Context) bool {
65-
return ready(ctx) && runnable.GetCache().WaitForCacheSync(ctx)
57+
return runnable.GetCache().WaitForCacheSync(ctx)
6658
})
6759
case *webhook.Server:
68-
return r.Webhooks.Add(fn, ready)
60+
return r.Webhooks.Add(fn, nil)
6961
case LeaderElectionRunnable:
7062
if !runnable.NeedLeaderElection() {
71-
return r.Others.Add(fn, ready)
63+
return r.Others.Add(fn, nil)
7264
}
73-
return r.LeaderElection.Add(fn, ready)
65+
return r.LeaderElection.Add(fn, nil)
7466
default:
75-
return r.LeaderElection.Add(fn, ready)
67+
return r.LeaderElection.Add(fn, nil)
7668
}
7769
}
7870

@@ -85,9 +77,11 @@ type runnableGroup struct {
8577
ctx context.Context
8678
cancel context.CancelFunc
8779

88-
start sync.Mutex
89-
startOnce sync.Once
90-
started bool
80+
start sync.Mutex
81+
startOnce sync.Once
82+
started bool
83+
startQueue []*readyRunnable
84+
startReadyCh chan *readyRunnable
9185

9286
stop sync.RWMutex
9387
stopOnce sync.Once
@@ -104,23 +98,14 @@ type runnableGroup struct {
10498
// wg is an internal sync.WaitGroup that allows us to properly stop
10599
// and wait for all the runnables to finish before returning.
106100
wg *sync.WaitGroup
107-
108-
// group is a sync.Map that contains every runnable ever.
109-
// The key of the map is the runnable itself (key'd by pointer),
110-
// while the value is its ready state.
111-
//
112-
// The group of runnable is append-only, runnables scheduled
113-
// through this group are going to be stored in this internal map
114-
// until the application exits. The limit is the available memory.
115-
group *sync.Map
116101
}
117102

118103
func newRunnableGroup(errChan chan error) *runnableGroup {
119104
r := &runnableGroup{
120-
errChan: errChan,
121-
ch: make(chan *readyRunnable),
122-
wg: new(sync.WaitGroup),
123-
group: new(sync.Map),
105+
startReadyCh: make(chan *readyRunnable),
106+
errChan: errChan,
107+
ch: make(chan *readyRunnable),
108+
wg: new(sync.WaitGroup),
124109
}
125110
r.ctx, r.cancel = context.WithCancel(context.Background())
126111
return r
@@ -133,25 +118,57 @@ func (r *runnableGroup) Started() bool {
133118
return r.started
134119
}
135120

136-
// StartAndWaitReady starts all the runnables previously
137-
// added to the group and waits for all to report ready.
138-
func (r *runnableGroup) StartAndWaitReady(ctx context.Context) error {
139-
r.Start()
140-
return r.WaitReady(ctx)
141-
}
121+
// Start starts the group and waits for all
122+
// initially registered runnables to start.
123+
// It can only be called once, subsequent calls have no effect.
124+
func (r *runnableGroup) Start(ctx context.Context) error {
125+
var retErr error
142126

143-
// Start starts the group, it can only be called once.
144-
func (r *runnableGroup) Start() {
145127
r.startOnce.Do(func() {
128+
defer close(r.startReadyCh)
129+
130+
// Start the internal reconciler.
146131
go r.reconcile()
132+
133+
// Start the group and queue up all
134+
// the runnables that were added prior.
147135
r.start.Lock()
148136
r.started = true
149-
r.group.Range(func(key, _ interface{}) bool {
150-
r.ch <- key.(*readyRunnable)
151-
return true
152-
})
137+
for _, rn := range r.startQueue {
138+
rn.signalReady = true
139+
r.ch <- rn
140+
}
153141
r.start.Unlock()
142+
143+
// If we don't have any queue, return.
144+
if len(r.startQueue) == 0 {
145+
return
146+
}
147+
148+
// Wait for all runnables to signal.
149+
for {
150+
select {
151+
case <-ctx.Done():
152+
if err := ctx.Err(); !errors.Is(err, context.Canceled) {
153+
retErr = err
154+
}
155+
case rn := <-r.startReadyCh:
156+
for i, existing := range r.startQueue {
157+
if existing == rn {
158+
// Remove the item from the start queue.
159+
r.startQueue = append(r.startQueue[:i], r.startQueue[i+1:]...)
160+
break
161+
}
162+
}
163+
// We're done waiting if the queue is empty, return.
164+
if len(r.startQueue) == 0 {
165+
return
166+
}
167+
}
168+
}
154169
})
170+
171+
return retErr
155172
}
156173

157174
// reconcile is our main entrypoint for every runnable added
@@ -185,26 +202,17 @@ func (r *runnableGroup) reconcile() {
185202
go func(rn *readyRunnable) {
186203
go func() {
187204
if rn.Check(r.ctx) {
188-
r.group.Store(rn, true)
205+
if rn.signalReady {
206+
r.startReadyCh <- rn
207+
}
189208
}
190209
}()
191210

192211
// If we return, the runnable ended cleanly
193212
// or returned an error to the channel.
194213
//
195-
// We should always decrement the WaitGroup and
196-
// mark the runnable as ready.
197-
//
198-
// Think about the group as an append-only system.
199-
//
200-
// A runnable is marked as ready if:
201-
// - The health check return true.
202-
// - The runnable Start() method returned and
203-
// it either finished cleanly (e.g. one shot operations)
204-
// or it failed to run and it returned an error which
205-
// gets propagated to the manager.
214+
// We should always decrement the WaitGroup here.
206215
defer r.wg.Done()
207-
defer r.group.Store(rn, true)
208216

209217
// Start the runnable.
210218
if err := rn.Start(r.ctx); err != nil {
@@ -214,27 +222,6 @@ func (r *runnableGroup) reconcile() {
214222
}
215223
}
216224

217-
// WaitReady polls until the group is ready or until the context is cancelled.
218-
func (r *runnableGroup) WaitReady(ctx context.Context) error {
219-
return wait.PollImmediateInfiniteWithContext(ctx,
220-
100*time.Millisecond,
221-
func(_ context.Context) (bool, error) {
222-
if !r.Started() {
223-
return false, nil
224-
}
225-
ready, total := 0, 0
226-
r.group.Range(func(_, value interface{}) bool {
227-
total++
228-
if rd, ok := value.(bool); ok && rd {
229-
ready++
230-
}
231-
return true
232-
})
233-
return ready == total, nil
234-
},
235-
)
236-
}
237-
238225
// Add should be able to be called before and after Start, but not after StopAndWait.
239226
// Add should return an error when called during StopAndWait.
240227
func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
@@ -261,11 +248,10 @@ func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
261248
{
262249
r.start.Lock()
263250

264-
// Store the runnable in the internal buffer.
265-
r.group.Store(readyRunnable, false)
266-
267251
// Check if we're already started.
268252
if !r.started {
253+
// Store the runnable in the internal if not.
254+
r.startQueue = append(r.startQueue, readyRunnable)
269255
r.start.Unlock()
270256
return nil
271257
}
@@ -283,7 +269,7 @@ func (r *runnableGroup) StopAndWait(ctx context.Context) {
283269
// Close the reconciler channel once we're done.
284270
defer close(r.ch)
285271

286-
r.Start()
272+
_ = r.Start(ctx)
287273
r.stop.Lock()
288274
// Store the stopped variable so we don't accept any new
289275
// runnables for the time being.

0 commit comments

Comments
 (0)