diff --git a/internal/events/events_suit_test.go b/internal/events/events_suit_test.go index 54853b6397..1c318a89b4 100644 --- a/internal/events/events_suit_test.go +++ b/internal/events/events_suit_test.go @@ -7,7 +7,7 @@ import ( . "github.com/onsi/gomega" ) -func TestState(t *testing.T) { +func TestEvents(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "Events Suite") } diff --git a/internal/events/events_test.go b/internal/events/events_test.go new file mode 100644 index 0000000000..a32d6dc974 --- /dev/null +++ b/internal/events/events_test.go @@ -0,0 +1,45 @@ +package events + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +func TestEventLoop_SwapBatches(t *testing.T) { + eventLoop := NewEventLoop(nil, zap.New(), nil, nil) + + eventLoop.currentBatch = EventBatch{ + "event0", + "event1", + "event2", + } + + nextBatch := EventBatch{ + "event3", + "event4", + "event5", + "event6", + } + + eventLoop.nextBatch = nextBatch + + eventLoop.swapBatches() + + if l := len(eventLoop.currentBatch); l != 4 { + t.Errorf("EventLoop.swapBatches() mismatch. Expected 4 events in the current batch, got %d", l) + } + + if diff := cmp.Diff(eventLoop.currentBatch, nextBatch); diff != "" { + t.Errorf("EventLoop.swapBatches() mismatch on current batch events (-want +got):\n%s", diff) + } + + if l := len(eventLoop.nextBatch); l != 0 { + t.Errorf("EventLoop.swapBatches() mismatch. Expected 0 events in the next batch, got %d", l) + } + + if c := cap(eventLoop.nextBatch); c != 3 { + t.Errorf("EventLoop.swapBatches() mismatch. Expected capacity of 3 in the next batch, got %d", c) + } +} diff --git a/internal/events/loop.go b/internal/events/loop.go index ba15d2a7ae..10be12ed5b 100644 --- a/internal/events/loop.go +++ b/internal/events/loop.go @@ -26,6 +26,13 @@ type EventLoop struct { preparer FirstEventBatchPreparer eventCh <-chan interface{} logger logr.Logger + + // The EventLoop uses double buffering to handle event batch processing. + // The goroutine that handles the batch will always read from the currentBatch slice. + // While the current batch is being handled, new events are added to the nextBatch slice. + // The batches are swapped before starting the handler goroutine. + currentBatch EventBatch + nextBatch EventBatch } // NewEventLoop creates a new EventLoop. @@ -36,24 +43,24 @@ func NewEventLoop( preparer FirstEventBatchPreparer, ) *EventLoop { return &EventLoop{ - eventCh: eventCh, - logger: logger, - handler: handler, - preparer: preparer, + eventCh: eventCh, + logger: logger, + handler: handler, + preparer: preparer, + currentBatch: make(EventBatch, 0), + nextBatch: make(EventBatch, 0), } } // Start starts the EventLoop. // This method will block until the EventLoop stops, which will happen after the ctx is closed. func (el *EventLoop) Start(ctx context.Context) error { - // The current batch. - var batch EventBatch // handling tells if any batch is currently being handled. var handling bool // handlingDone is used to signal the completion of handling a batch. handlingDone := make(chan struct{}) - handleAndResetBatch := func() { + handleBatch := func() { go func(batch EventBatch) { el.logger.Info("Handling events from the batch", "total", len(batch)) @@ -61,12 +68,13 @@ func (el *EventLoop) Start(ctx context.Context) error { el.logger.Info("Finished handling the batch") handlingDone <- struct{}{} - }(batch) + }(el.currentBatch) + } - // FIXME(pleshakov): Making an entirely new buffer is inefficient and multiplies memory operations. - // Use a double-buffer approach - create two buffers and exchange them between the producer and consumer - // routines. NOTE: pass-by-reference, and reset buffer to length 0, but retain capacity. - batch = make([]interface{}, 0) + swapAndHandleBatch := func() { + el.swapBatches() + handleBatch() + handling = true } // Prepare the fist event batch, which includes the UpsertEvents for all relevant cluster resources. @@ -81,13 +89,13 @@ func (el *EventLoop) Start(ctx context.Context) error { // not trigger any reconfiguration after receiving an upsert for an existing resource with the same Generation. var err error - batch, err = el.preparer.Prepare(ctx) + el.currentBatch, err = el.preparer.Prepare(ctx) if err != nil { return fmt.Errorf("failed to prepare the first batch: %w", err) } // Handle the first batch - handleAndResetBatch() + handleBatch() handling = true // Note: at any point of time, no more than one batch is currently being handled. @@ -103,28 +111,32 @@ func (el *EventLoop) Start(ctx context.Context) error { return nil case e := <-el.eventCh: // Add the event to the current batch. - batch = append(batch, e) + el.nextBatch = append(el.nextBatch, e) // FIXME(pleshakov): Log more details about the event like resource GVK and ns/name. el.logger.Info( - "added an event to the current batch", + "added an event to the next batch", "type", fmt.Sprintf("%T", e), - "total", len(batch), + "total", len(el.nextBatch), ) - // Handle the current batch if no batch is being handled. + // If no batch is currently being handled, swap batches and begin handling the batch. if !handling { - handleAndResetBatch() - handling = true + swapAndHandleBatch() } case <-handlingDone: handling = false - // Handle the current batch if it has at least one event. - if len(batch) > 0 { - handleAndResetBatch() - handling = true + // If there's at least one event in the next batch, swap batches and begin handling the batch. + if len(el.nextBatch) > 0 { + swapAndHandleBatch() } } } } + +// swapBatches swaps the current and next batches. +func (el *EventLoop) swapBatches() { + el.currentBatch, el.nextBatch = el.nextBatch, el.currentBatch + el.nextBatch = el.nextBatch[:0] +}