From 9b30f409ad4d81b1c1c32bd45971172fe9de8ad3 Mon Sep 17 00:00:00 2001 From: Kate Osborn Date: Fri, 14 Oct 2022 15:39:25 -0600 Subject: [PATCH 1/4] Use double buffers in EventLoop --- internal/events/events_suit_test.go | 2 +- internal/events/events_test.go | 45 ++++++++++++++++++++++ internal/events/loop.go | 59 ++++++++++++++++++----------- 3 files changed, 82 insertions(+), 24 deletions(-) create mode 100644 internal/events/events_test.go diff --git a/internal/events/events_suit_test.go b/internal/events/events_suit_test.go index 54853b6397..1c318a89b4 100644 --- a/internal/events/events_suit_test.go +++ b/internal/events/events_suit_test.go @@ -7,7 +7,7 @@ import ( . "github.com/onsi/gomega" ) -func TestState(t *testing.T) { +func TestEvents(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "Events Suite") } diff --git a/internal/events/events_test.go b/internal/events/events_test.go new file mode 100644 index 0000000000..553d4eb586 --- /dev/null +++ b/internal/events/events_test.go @@ -0,0 +1,45 @@ +package events + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +func TestEventLoop_SwapBatches(t *testing.T) { + eventLoop := NewEventLoop(nil, zap.New(), nil, nil) + + eventLoop.currentBatch = []interface{}{ + "event0", + "event1", + "event2", + } + + nextBatch := EventBatch{ + "event3", + "event4", + "event5", + "event6", + } + + eventLoop.nextBatch = nextBatch + + eventLoop.swapBatches() + + if l := len(eventLoop.currentBatch); l != 4 { + t.Errorf("EventLoop.swapBatches() mismatch. Expected 4 events in the current batch, got %d", l) + } + + if diff := cmp.Diff(eventLoop.currentBatch, nextBatch); diff != "" { + t.Errorf("EventLoop.swapBatches() mismatch on current batch events (-want +got):\n%s", diff) + } + + if l := len(eventLoop.nextBatch); l != 0 { + t.Errorf("EventLoop.swapBatches() mismatch. Expected 0 events in the next batch, got %d", l) + } + + if c := cap(eventLoop.nextBatch); c != 3 { + t.Errorf("EventLoop.swapBatches() mismatch. Expected capacity of 4 in the next batch, got %d", c) + } +} diff --git a/internal/events/loop.go b/internal/events/loop.go index ba15d2a7ae..8b2d94144a 100644 --- a/internal/events/loop.go +++ b/internal/events/loop.go @@ -26,6 +26,13 @@ type EventLoop struct { preparer FirstEventBatchPreparer eventCh <-chan interface{} logger logr.Logger + + // The EventLoop uses double buffering to handle event batch processing. + // The goroutine that handles the batch will always read from the currentBatch slice. + // While the current batch is being handled, new events are added to the nextBatch slice. + // The batches are swapped after the handler finishes with the current batch. + currentBatch EventBatch + nextBatch EventBatch } // NewEventLoop creates a new EventLoop. @@ -36,24 +43,24 @@ func NewEventLoop( preparer FirstEventBatchPreparer, ) *EventLoop { return &EventLoop{ - eventCh: eventCh, - logger: logger, - handler: handler, - preparer: preparer, + eventCh: eventCh, + logger: logger, + handler: handler, + preparer: preparer, + currentBatch: make(EventBatch, 0), + nextBatch: make(EventBatch, 0), } } // Start starts the EventLoop. // This method will block until the EventLoop stops, which will happen after the ctx is closed. func (el *EventLoop) Start(ctx context.Context) error { - // The current batch. - var batch EventBatch // handling tells if any batch is currently being handled. var handling bool // handlingDone is used to signal the completion of handling a batch. handlingDone := make(chan struct{}) - handleAndResetBatch := func() { + handleBatch := func() { go func(batch EventBatch) { el.logger.Info("Handling events from the batch", "total", len(batch)) @@ -61,12 +68,7 @@ func (el *EventLoop) Start(ctx context.Context) error { el.logger.Info("Finished handling the batch") handlingDone <- struct{}{} - }(batch) - - // FIXME(pleshakov): Making an entirely new buffer is inefficient and multiplies memory operations. - // Use a double-buffer approach - create two buffers and exchange them between the producer and consumer - // routines. NOTE: pass-by-reference, and reset buffer to length 0, but retain capacity. - batch = make([]interface{}, 0) + }(el.currentBatch) } // Prepare the fist event batch, which includes the UpsertEvents for all relevant cluster resources. @@ -81,13 +83,13 @@ func (el *EventLoop) Start(ctx context.Context) error { // not trigger any reconfiguration after receiving an upsert for an existing resource with the same Generation. var err error - batch, err = el.preparer.Prepare(ctx) + el.currentBatch, err = el.preparer.Prepare(ctx) if err != nil { return fmt.Errorf("failed to prepare the first batch: %w", err) } // Handle the first batch - handleAndResetBatch() + handleBatch() handling = true // Note: at any point of time, no more than one batch is currently being handled. @@ -103,28 +105,39 @@ func (el *EventLoop) Start(ctx context.Context) error { return nil case e := <-el.eventCh: // Add the event to the current batch. - batch = append(batch, e) + el.nextBatch = append(el.nextBatch, e) // FIXME(pleshakov): Log more details about the event like resource GVK and ns/name. el.logger.Info( - "added an event to the current batch", + "added an event to the next batch", "type", fmt.Sprintf("%T", e), - "total", len(batch), + "total", len(el.nextBatch), ) - // Handle the current batch if no batch is being handled. + // If no batch is currently being handled, swap batches and begin handling the batch. if !handling { - handleAndResetBatch() + el.swapBatches() + + handleBatch() handling = true } case <-handlingDone: handling = false - // Handle the current batch if it has at least one event. - if len(batch) > 0 { - handleAndResetBatch() + // If there's at least one event in the next batch, swap batches and begin handling the batch. + if len(el.nextBatch) > 0 { + el.swapBatches() + + handleBatch() handling = true } } } } + +// swapBatches swaps the current and next batches. +func (el *EventLoop) swapBatches() { + temp := el.currentBatch + el.currentBatch = el.nextBatch + el.nextBatch = temp[:0] +} From bad765359cf37f55b66b52af6aa0b4b4f43ccf16 Mon Sep 17 00:00:00 2001 From: Kate Osborn Date: Mon, 17 Oct 2022 15:07:29 -0600 Subject: [PATCH 2/4] Remove temp var in swap --- internal/events/loop.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/events/loop.go b/internal/events/loop.go index 8b2d94144a..ac62943d7d 100644 --- a/internal/events/loop.go +++ b/internal/events/loop.go @@ -137,7 +137,6 @@ func (el *EventLoop) Start(ctx context.Context) error { // swapBatches swaps the current and next batches. func (el *EventLoop) swapBatches() { - temp := el.currentBatch - el.currentBatch = el.nextBatch - el.nextBatch = temp[:0] + el.currentBatch, el.nextBatch = el.nextBatch, el.currentBatch + el.nextBatch = el.nextBatch[:0] } From b5befd82ca1f73f4f9ed6483a55b00ebb8c14e1a Mon Sep 17 00:00:00 2001 From: Kate Osborn Date: Wed, 19 Oct 2022 14:36:22 -0600 Subject: [PATCH 3/4] Address code review comments --- internal/events/events_test.go | 2 +- internal/events/loop.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/events/events_test.go b/internal/events/events_test.go index 553d4eb586..e8c727c289 100644 --- a/internal/events/events_test.go +++ b/internal/events/events_test.go @@ -40,6 +40,6 @@ func TestEventLoop_SwapBatches(t *testing.T) { } if c := cap(eventLoop.nextBatch); c != 3 { - t.Errorf("EventLoop.swapBatches() mismatch. Expected capacity of 4 in the next batch, got %d", c) + t.Errorf("EventLoop.swapBatches() mismatch. Expected capacity of 3 in the next batch, got %d", c) } } diff --git a/internal/events/loop.go b/internal/events/loop.go index ac62943d7d..0496ab8540 100644 --- a/internal/events/loop.go +++ b/internal/events/loop.go @@ -30,7 +30,7 @@ type EventLoop struct { // The EventLoop uses double buffering to handle event batch processing. // The goroutine that handles the batch will always read from the currentBatch slice. // While the current batch is being handled, new events are added to the nextBatch slice. - // The batches are swapped after the handler finishes with the current batch. + // The batches are swapped before starting the handler goroutine. currentBatch EventBatch nextBatch EventBatch } From ddd584f09a641f28b40676dd878fd9b8e1631e46 Mon Sep 17 00:00:00 2001 From: Kate Osborn Date: Mon, 7 Nov 2022 11:35:53 -0700 Subject: [PATCH 4/4] Add swapAndHandleBatch func; fix unit test --- internal/events/events_test.go | 2 +- internal/events/loop.go | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/internal/events/events_test.go b/internal/events/events_test.go index e8c727c289..a32d6dc974 100644 --- a/internal/events/events_test.go +++ b/internal/events/events_test.go @@ -10,7 +10,7 @@ import ( func TestEventLoop_SwapBatches(t *testing.T) { eventLoop := NewEventLoop(nil, zap.New(), nil, nil) - eventLoop.currentBatch = []interface{}{ + eventLoop.currentBatch = EventBatch{ "event0", "event1", "event2", diff --git a/internal/events/loop.go b/internal/events/loop.go index 0496ab8540..10be12ed5b 100644 --- a/internal/events/loop.go +++ b/internal/events/loop.go @@ -71,6 +71,12 @@ func (el *EventLoop) Start(ctx context.Context) error { }(el.currentBatch) } + swapAndHandleBatch := func() { + el.swapBatches() + handleBatch() + handling = true + } + // Prepare the fist event batch, which includes the UpsertEvents for all relevant cluster resources. // This is necessary so that the first time the EventHandler generates NGINX configuration, it derives it from // a complete view of the cluster. Otherwise, the handler would generate incomplete configuration, which can lead @@ -116,20 +122,14 @@ func (el *EventLoop) Start(ctx context.Context) error { // If no batch is currently being handled, swap batches and begin handling the batch. if !handling { - el.swapBatches() - - handleBatch() - handling = true + swapAndHandleBatch() } case <-handlingDone: handling = false // If there's at least one event in the next batch, swap batches and begin handling the batch. if len(el.nextBatch) > 0 { - el.swapBatches() - - handleBatch() - handling = true + swapAndHandleBatch() } } }