Skip to content

Commit 6520a4d

Browse files
authored
Use double buffers in EventLoop (#270)
1 parent 3ff0ced commit 6520a4d

File tree

3 files changed

+82
-25
lines changed

3 files changed

+82
-25
lines changed

internal/events/events_suit_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
. "github.com/onsi/gomega"
88
)
99

10-
func TestState(t *testing.T) {
10+
func TestEvents(t *testing.T) {
1111
RegisterFailHandler(Fail)
1212
RunSpecs(t, "Events Suite")
1313
}

internal/events/events_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package events
2+
3+
import (
4+
"testing"
5+
6+
"github.com/google/go-cmp/cmp"
7+
"sigs.k8s.io/controller-runtime/pkg/log/zap"
8+
)
9+
10+
func TestEventLoop_SwapBatches(t *testing.T) {
11+
eventLoop := NewEventLoop(nil, zap.New(), nil, nil)
12+
13+
eventLoop.currentBatch = EventBatch{
14+
"event0",
15+
"event1",
16+
"event2",
17+
}
18+
19+
nextBatch := EventBatch{
20+
"event3",
21+
"event4",
22+
"event5",
23+
"event6",
24+
}
25+
26+
eventLoop.nextBatch = nextBatch
27+
28+
eventLoop.swapBatches()
29+
30+
if l := len(eventLoop.currentBatch); l != 4 {
31+
t.Errorf("EventLoop.swapBatches() mismatch. Expected 4 events in the current batch, got %d", l)
32+
}
33+
34+
if diff := cmp.Diff(eventLoop.currentBatch, nextBatch); diff != "" {
35+
t.Errorf("EventLoop.swapBatches() mismatch on current batch events (-want +got):\n%s", diff)
36+
}
37+
38+
if l := len(eventLoop.nextBatch); l != 0 {
39+
t.Errorf("EventLoop.swapBatches() mismatch. Expected 0 events in the next batch, got %d", l)
40+
}
41+
42+
if c := cap(eventLoop.nextBatch); c != 3 {
43+
t.Errorf("EventLoop.swapBatches() mismatch. Expected capacity of 3 in the next batch, got %d", c)
44+
}
45+
}

internal/events/loop.go

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@ type EventLoop struct {
2626
preparer FirstEventBatchPreparer
2727
eventCh <-chan interface{}
2828
logger logr.Logger
29+
30+
// The EventLoop uses double buffering to handle event batch processing.
31+
// The goroutine that handles the batch will always read from the currentBatch slice.
32+
// While the current batch is being handled, new events are added to the nextBatch slice.
33+
// The batches are swapped before starting the handler goroutine.
34+
currentBatch EventBatch
35+
nextBatch EventBatch
2936
}
3037

3138
// NewEventLoop creates a new EventLoop.
@@ -36,37 +43,38 @@ func NewEventLoop(
3643
preparer FirstEventBatchPreparer,
3744
) *EventLoop {
3845
return &EventLoop{
39-
eventCh: eventCh,
40-
logger: logger,
41-
handler: handler,
42-
preparer: preparer,
46+
eventCh: eventCh,
47+
logger: logger,
48+
handler: handler,
49+
preparer: preparer,
50+
currentBatch: make(EventBatch, 0),
51+
nextBatch: make(EventBatch, 0),
4352
}
4453
}
4554

4655
// Start starts the EventLoop.
4756
// This method will block until the EventLoop stops, which will happen after the ctx is closed.
4857
func (el *EventLoop) Start(ctx context.Context) error {
49-
// The current batch.
50-
var batch EventBatch
5158
// handling tells if any batch is currently being handled.
5259
var handling bool
5360
// handlingDone is used to signal the completion of handling a batch.
5461
handlingDone := make(chan struct{})
5562

56-
handleAndResetBatch := func() {
63+
handleBatch := func() {
5764
go func(batch EventBatch) {
5865
el.logger.Info("Handling events from the batch", "total", len(batch))
5966

6067
el.handler.HandleEventBatch(ctx, batch)
6168

6269
el.logger.Info("Finished handling the batch")
6370
handlingDone <- struct{}{}
64-
}(batch)
71+
}(el.currentBatch)
72+
}
6573

66-
// FIXME(pleshakov): Making an entirely new buffer is inefficient and multiplies memory operations.
67-
// Use a double-buffer approach - create two buffers and exchange them between the producer and consumer
68-
// routines. NOTE: pass-by-reference, and reset buffer to length 0, but retain capacity.
69-
batch = make([]interface{}, 0)
74+
swapAndHandleBatch := func() {
75+
el.swapBatches()
76+
handleBatch()
77+
handling = true
7078
}
7179

7280
// Prepare the fist event batch, which includes the UpsertEvents for all relevant cluster resources.
@@ -81,13 +89,13 @@ func (el *EventLoop) Start(ctx context.Context) error {
8189
// not trigger any reconfiguration after receiving an upsert for an existing resource with the same Generation.
8290

8391
var err error
84-
batch, err = el.preparer.Prepare(ctx)
92+
el.currentBatch, err = el.preparer.Prepare(ctx)
8593
if err != nil {
8694
return fmt.Errorf("failed to prepare the first batch: %w", err)
8795
}
8896

8997
// Handle the first batch
90-
handleAndResetBatch()
98+
handleBatch()
9199
handling = true
92100

93101
// Note: at any point of time, no more than one batch is currently being handled.
@@ -103,28 +111,32 @@ func (el *EventLoop) Start(ctx context.Context) error {
103111
return nil
104112
case e := <-el.eventCh:
105113
// Add the event to the current batch.
106-
batch = append(batch, e)
114+
el.nextBatch = append(el.nextBatch, e)
107115

108116
// FIXME(pleshakov): Log more details about the event like resource GVK and ns/name.
109117
el.logger.Info(
110-
"added an event to the current batch",
118+
"added an event to the next batch",
111119
"type", fmt.Sprintf("%T", e),
112-
"total", len(batch),
120+
"total", len(el.nextBatch),
113121
)
114122

115-
// Handle the current batch if no batch is being handled.
123+
// If no batch is currently being handled, swap batches and begin handling the batch.
116124
if !handling {
117-
handleAndResetBatch()
118-
handling = true
125+
swapAndHandleBatch()
119126
}
120127
case <-handlingDone:
121128
handling = false
122129

123-
// Handle the current batch if it has at least one event.
124-
if len(batch) > 0 {
125-
handleAndResetBatch()
126-
handling = true
130+
// If there's at least one event in the next batch, swap batches and begin handling the batch.
131+
if len(el.nextBatch) > 0 {
132+
swapAndHandleBatch()
127133
}
128134
}
129135
}
130136
}
137+
138+
// swapBatches swaps the current and next batches.
139+
func (el *EventLoop) swapBatches() {
140+
el.currentBatch, el.nextBatch = el.nextBatch, el.currentBatch
141+
el.nextBatch = el.nextBatch[:0]
142+
}

0 commit comments

Comments
 (0)