Skip to content

Commit 7f831b5

Browse files
authored
Batch events (#175)
Previously, the Gateway would handle one event (upserting or deleting a Kubernetes resource) at a time. This commit introduces event batching: multiple events (a batch) are handled at once. Batching is needed because, because typically handling an event (or multiple events at once) will result into reloading NGINX, which is the operation we want to minimize, for the following reasons: (1) A reload takes time - at least 200ms. The time depends on the size of the configuration including the number of TLS certs, available CPU cycles. (2) A reload can have side-effects for the data plane traffic. Now, when a new event comes, there are two cases: - If there is no event(s) currently being handled, the new event is handled immediately. - Otherwise, the new event will be saved for later handling. All saved events will be handled after the handling of the current event(s) finishes. Multiple saved events will be handled at once in one batch. Additional implementation notes: (a) The EventLoop was split into two parts: (1) The EventHandler, which is only responsible for handling a batch of events, without dealing with concurrency. (2) The stripped-down EventLoop, which is responsible for batching events and propagating batches to the EventHandler in a dedicated goroutine. (b) The ChangeProcessor was fixed, so that when multiple changes are captured (coming from a single batch) -- first, changes that cause data plane reconfiguration, followed by changes that do not cause reconfiguration -- in that case, the data plane will be reconfigured. Without the fix, the latter changes would prevent data plane from reconfiguring. Note that the bug only appeared when batching was added.
1 parent ee9d24b commit 7f831b5

File tree

9 files changed

+830
-372
lines changed

9 files changed

+830
-372
lines changed

internal/events/event.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ import (
55
"sigs.k8s.io/controller-runtime/pkg/client"
66
)
77

8+
// EventBatch is a batch of events to be handled at once.
9+
// FIXME(pleshakov): think about how to avoid using an interface{} here
10+
type EventBatch []interface{}
11+
812
// UpsertEvent represents upserting a resource.
913
type UpsertEvent struct {
1014
// Resource is the resource that is being upserted.

internal/events/eventsfakes/fake_event_handler.go

Lines changed: 79 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/events/handler.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package events
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/go-logr/logr"
8+
apiv1 "k8s.io/api/core/v1"
9+
"sigs.k8s.io/gateway-api/apis/v1beta1"
10+
11+
"github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/config"
12+
"github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/file"
13+
"github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/runtime"
14+
"github.com/nginxinc/nginx-kubernetes-gateway/internal/state"
15+
"github.com/nginxinc/nginx-kubernetes-gateway/internal/status"
16+
)
17+
18+
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . EventHandler
19+
20+
// EventHandler handle events.
21+
type EventHandler interface {
22+
// HandleEventBatch handles a batch of events.
23+
HandleEventBatch(ctx context.Context, batch EventBatch)
24+
}
25+
26+
// EventHandlerConfig holds configuration parameters for EventHandlerImpl.
27+
type EventHandlerConfig struct {
28+
// Processor is the state ChangeProcessor.
29+
Processor state.ChangeProcessor
30+
// ServiceStore is the state ServiceStore.
31+
ServiceStore state.ServiceStore
32+
// SecretStore is the state SecretStore.
33+
SecretStore state.SecretStore
34+
// SecretMemoryManager is the state SecretMemoryManager.
35+
SecretMemoryManager state.SecretDiskMemoryManager
36+
// Generator is the nginx config Generator.
37+
Generator config.Generator
38+
// Logger is the logger to be used by the EventHandler.
39+
Logger logr.Logger
40+
// NginxFileMgr is the file Manager for nginx.
41+
NginxFileMgr file.Manager
42+
// NginxRuntimeMgr manages nginx runtime.
43+
NginxRuntimeMgr runtime.Manager
44+
// StatusUpdater updates statuses on Kubernetes resources.
45+
StatusUpdater status.Updater
46+
}
47+
48+
// EventHandlerImpl implements EventHandler.
49+
// EventHandlerImpl is responsible for:
50+
// (1) Reconciling the Gateway API and Kubernetes built-in resources with the NGINX configuration.
51+
// (2) Keeping the statuses of the Gateway API resources updated.
52+
type EventHandlerImpl struct {
53+
cfg EventHandlerConfig
54+
}
55+
56+
// NewEventHandlerImpl creates a new EventHandlerImpl.
57+
func NewEventHandlerImpl(cfg EventHandlerConfig) *EventHandlerImpl {
58+
return &EventHandlerImpl{
59+
cfg: cfg,
60+
}
61+
}
62+
63+
func (h *EventHandlerImpl) HandleEventBatch(ctx context.Context, batch EventBatch) {
64+
65+
for _, event := range batch {
66+
switch e := event.(type) {
67+
case *UpsertEvent:
68+
h.propagateUpsert(e)
69+
case *DeleteEvent:
70+
h.propagateDelete(e)
71+
default:
72+
panic(fmt.Errorf("unknown event type %T", e))
73+
}
74+
}
75+
76+
changed, conf, statuses := h.cfg.Processor.Process()
77+
if !changed {
78+
h.cfg.Logger.Info("Handling events didn't result into NGINX configuration changes")
79+
return
80+
}
81+
82+
err := h.updateNginx(ctx, conf)
83+
if err != nil {
84+
h.cfg.Logger.Error(err, "Failed to update NGINX configuration")
85+
} else {
86+
h.cfg.Logger.Info("NGINX configuration was successfully updated")
87+
}
88+
89+
h.cfg.StatusUpdater.Update(ctx, statuses)
90+
}
91+
92+
func (h *EventHandlerImpl) updateNginx(ctx context.Context, conf state.Configuration) error {
93+
// Write all secrets (nuke and pave).
94+
// This will remove all secrets in the secrets directory before writing the requested secrets.
95+
// FIXME(kate-osborn): We may want to rethink this approach in the future and write and remove secrets individually.
96+
err := h.cfg.SecretMemoryManager.WriteAllRequestedSecrets()
97+
if err != nil {
98+
return err
99+
}
100+
101+
cfg, warnings := h.cfg.Generator.Generate(conf)
102+
103+
// For now, we keep all http servers in one config
104+
// We might rethink that. For example, we can write each server to its file
105+
// or group servers in some way.
106+
err = h.cfg.NginxFileMgr.WriteHTTPServersConfig("http-servers", cfg)
107+
if err != nil {
108+
return err
109+
}
110+
111+
for obj, objWarnings := range warnings {
112+
for _, w := range objWarnings {
113+
// FIXME(pleshakov): report warnings via Object status
114+
h.cfg.Logger.Info("Got warning while generating config",
115+
"kind", obj.GetObjectKind().GroupVersionKind().Kind,
116+
"namespace", obj.GetNamespace(),
117+
"name", obj.GetName(),
118+
"warning", w)
119+
}
120+
}
121+
122+
return h.cfg.NginxRuntimeMgr.Reload(ctx)
123+
}
124+
125+
func (h *EventHandlerImpl) propagateUpsert(e *UpsertEvent) {
126+
switch r := e.Resource.(type) {
127+
case *v1beta1.GatewayClass:
128+
h.cfg.Processor.CaptureUpsertChange(r)
129+
case *v1beta1.Gateway:
130+
h.cfg.Processor.CaptureUpsertChange(r)
131+
case *v1beta1.HTTPRoute:
132+
h.cfg.Processor.CaptureUpsertChange(r)
133+
case *apiv1.Service:
134+
// FIXME(pleshakov): make sure the affected hosts are updated
135+
h.cfg.ServiceStore.Upsert(r)
136+
case *apiv1.Secret:
137+
// FIXME(kate-osborn): need to handle certificate rotation
138+
h.cfg.SecretStore.Upsert(r)
139+
default:
140+
panic(fmt.Errorf("unknown resource type %T", e.Resource))
141+
}
142+
}
143+
144+
func (h *EventHandlerImpl) propagateDelete(e *DeleteEvent) {
145+
switch e.Type.(type) {
146+
case *v1beta1.GatewayClass:
147+
h.cfg.Processor.CaptureDeleteChange(e.Type, e.NamespacedName)
148+
case *v1beta1.Gateway:
149+
h.cfg.Processor.CaptureDeleteChange(e.Type, e.NamespacedName)
150+
case *v1beta1.HTTPRoute:
151+
h.cfg.Processor.CaptureDeleteChange(e.Type, e.NamespacedName)
152+
case *apiv1.Service:
153+
// FIXME(pleshakov): make sure the affected hosts are updated
154+
h.cfg.ServiceStore.Delete(e.NamespacedName)
155+
case *apiv1.Secret:
156+
// FIXME(kate-osborn): make sure that affected servers are updated
157+
h.cfg.SecretStore.Delete(e.NamespacedName)
158+
default:
159+
panic(fmt.Errorf("unknown resource type %T", e.Type))
160+
}
161+
}

0 commit comments

Comments
 (0)