Skip to content

Commit c4a0727

Browse files
authored
Feature/dynamic upstreams (#221)
Add support for dynamic upstreams Previously, NKG used the cluster IP of the Service to route traffic to the backend Services specified by HTTPRoutes. With this commit, NKG will use the endpoints of the Pods corresponding to a Service as the upstream servers for a backend Service. This change adds the following components: * EndpointSlice controller for caching and listing EndpointSlices. * Relationship.Capturer for tracking and reporting on relationships between Gateway API resources and non-Gateway API resources (e.g. Services). * ServiceResolver replaces the ServiceStore and resolves Service:Port to a list * of endpoints. This commit also adds upstreams to the nginx config generator. One upstream is generated for each unique and valid Service:Port BackendRef. If a BackendRef cannot be resolved, a 502 is returned. Known Limitations: * Traffic cannot be routed to Headless Services that do not have a defined port. * If a user manually creates and EndpointSlice, they will need to populate the "kubernetes.io/service-name" label and set the ready condition of the endpoints to true. Otherwise, NKG will fail to resolve the Service endpoints.
1 parent b946e9c commit c4a0727

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+4783
-1445
lines changed

deploy/manifests/nginx-gateway.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,13 @@ rules:
1717
verbs:
1818
- list
1919
- watch
20+
- apiGroups:
21+
- discovery.k8s.io
22+
resources:
23+
- endpointslices
24+
verbs:
25+
- list
26+
- watch
2027
- apiGroups:
2128
- gateway.networking.k8s.io
2229
resources:

docs/gateway-api-compatibility.md.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,16 @@ Fields:
8787
* `headers` - partially supported. Only `Exact` type.
8888
* `queryParams` - partially supported. Only `Exact` type.
8989
* `method` - supported.
90+
<<<<<<< HEAD
9091
* `filters`
9192
* `type` - supported.
9293
* `requestRedirect` - supported except for the experimental `path` field. If multiple filters with `requestRedirect` are configured, NGINX Kubernetes Gateway will choose the first one and ignore the rest.
9394
* `requestHeaderModifier`, `requestMirror`, `urlRewrite`, `extensionRef` - not supported.
9495
* `backendRefs` - partially supported. Only a single backend ref without support for `weight`. Backend ref `filters` are not supported. NGINX Kubernetes Gateway will use the IP of the Service as a backend, not the IPs of the corresponding Pods. Watching for Service updates is not supported.
96+
=======
97+
* `filters` - not supported.
98+
* `backendRefs` - partially supported. Only a single backend ref without support for `weight`. Backend ref `filters` are not supported.
99+
>>>>>>> 1697a08... Add support for dynamic upstreams
95100
* `status`
96101
* `parents`
97102
* `parentRef` - supported.
@@ -120,4 +125,4 @@ Fields:
120125
121126
Custom policies will be NGINX Kubernetes Gateway-specific CRDs that will allow supporting features like timeouts, load-balancing methods, authentication, etc. - important data-plane features that are not part of the Gateway API spec.
122127

123-
While those CRDs are not part of the Gateway API, the mechanism of attaching them to Gateway API resources is part of the Gateway API. See the [Policy Attachment doc](https://gateway-api.sigs.k8s.io/references/policy-attachment/).
128+
While those CRDs are not part of the Gateway API, the mechanism of attaching them to Gateway API resources is part of the Gateway API. See the [Policy Attachment doc](https://gateway-api.sigs.k8s.io/references/policy-attachment/).

internal/events/handler.go

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/go-logr/logr"
88
apiv1 "k8s.io/api/core/v1"
9+
discoveryV1 "k8s.io/api/discovery/v1"
910
"sigs.k8s.io/gateway-api/apis/v1beta1"
1011

1112
"github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/config"
@@ -28,8 +29,6 @@ type EventHandler interface {
2829
type EventHandlerConfig struct {
2930
// Processor is the state ChangeProcessor.
3031
Processor state.ChangeProcessor
31-
// ServiceStore is the state ServiceStore.
32-
ServiceStore state.ServiceStore
3332
// SecretStore is the state SecretStore.
3433
SecretStore state.SecretStore
3534
// SecretMemoryManager is the state SecretMemoryManager.
@@ -73,7 +72,7 @@ func (h *EventHandlerImpl) HandleEventBatch(ctx context.Context, batch EventBatc
7372
}
7473
}
7574

76-
changed, conf, statuses := h.cfg.Processor.Process()
75+
changed, conf, statuses := h.cfg.Processor.Process(ctx)
7776
if !changed {
7877
h.cfg.Logger.Info("Handling events didn't result into NGINX configuration changes")
7978
return
@@ -98,27 +97,16 @@ func (h *EventHandlerImpl) updateNginx(ctx context.Context, conf state.Configura
9897
return err
9998
}
10099

101-
cfg, warnings := h.cfg.Generator.Generate(conf)
100+
cfg := h.cfg.Generator.Generate(conf)
102101

103-
// For now, we keep all http servers in one config
102+
// For now, we keep all http servers and upstreams in one config file.
104103
// We might rethink that. For example, we can write each server to its file
105104
// or group servers in some way.
106-
err = h.cfg.NginxFileMgr.WriteHTTPServersConfig("http-servers", cfg)
105+
err = h.cfg.NginxFileMgr.WriteHTTPConfig("http", cfg)
107106
if err != nil {
108107
return err
109108
}
110109

111-
for obj, objWarnings := range warnings {
112-
for _, w := range objWarnings {
113-
// FIXME(pleshakov): report warnings via Object status
114-
h.cfg.Logger.Info("Got warning while generating config",
115-
"kind", obj.GetObjectKind().GroupVersionKind().Kind,
116-
"namespace", obj.GetNamespace(),
117-
"name", obj.GetName(),
118-
"warning", w)
119-
}
120-
}
121-
122110
return h.cfg.NginxRuntimeMgr.Reload(ctx)
123111
}
124112

@@ -131,11 +119,12 @@ func (h *EventHandlerImpl) propagateUpsert(e *UpsertEvent) {
131119
case *v1beta1.HTTPRoute:
132120
h.cfg.Processor.CaptureUpsertChange(r)
133121
case *apiv1.Service:
134-
// FIXME(pleshakov): make sure the affected hosts are updated
135-
h.cfg.ServiceStore.Upsert(r)
122+
h.cfg.Processor.CaptureUpsertChange(r)
136123
case *apiv1.Secret:
137124
// FIXME(kate-osborn): need to handle certificate rotation
138125
h.cfg.SecretStore.Upsert(r)
126+
case *discoveryV1.EndpointSlice:
127+
h.cfg.Processor.CaptureUpsertChange(r)
139128
default:
140129
panic(fmt.Errorf("unknown resource type %T", e.Resource))
141130
}
@@ -150,11 +139,12 @@ func (h *EventHandlerImpl) propagateDelete(e *DeleteEvent) {
150139
case *v1beta1.HTTPRoute:
151140
h.cfg.Processor.CaptureDeleteChange(e.Type, e.NamespacedName)
152141
case *apiv1.Service:
153-
// FIXME(pleshakov): make sure the affected hosts are updated
154-
h.cfg.ServiceStore.Delete(e.NamespacedName)
142+
h.cfg.Processor.CaptureDeleteChange(e.Type, e.NamespacedName)
155143
case *apiv1.Secret:
156144
// FIXME(kate-osborn): make sure that affected servers are updated
157145
h.cfg.SecretStore.Delete(e.NamespacedName)
146+
case *discoveryV1.EndpointSlice:
147+
h.cfg.Processor.CaptureDeleteChange(e.Type, e.NamespacedName)
158148
default:
159149
panic(fmt.Errorf("unknown resource type %T", e.Type))
160150
}

internal/events/handler_test.go

Lines changed: 45 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
. "github.com/onsi/ginkgo/v2"
88
. "github.com/onsi/gomega"
99
apiv1 "k8s.io/api/core/v1"
10+
discoveryV1 "k8s.io/api/discovery/v1"
1011
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1112
"k8s.io/apimachinery/pkg/runtime"
1213
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -15,7 +16,6 @@ import (
1516
"sigs.k8s.io/gateway-api/apis/v1beta1"
1617

1718
"github.com/nginxinc/nginx-kubernetes-gateway/internal/events"
18-
"github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/config"
1919
"github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/config/configfakes"
2020
"github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/file/filefakes"
2121
"github.com/nginxinc/nginx-kubernetes-gateway/internal/nginx/runtime/runtimefakes"
@@ -40,11 +40,10 @@ var _ = Describe("EventHandler", func() {
4040
var (
4141
handler *events.EventHandlerImpl
4242
fakeProcessor *statefakes.FakeChangeProcessor
43-
fakeServiceStore *statefakes.FakeServiceStore
4443
fakeSecretStore *statefakes.FakeSecretStore
4544
fakeSecretMemoryManager *statefakes.FakeSecretDiskMemoryManager
4645
fakeGenerator *configfakes.FakeGenerator
47-
fakeNginxFimeMgr *filefakes.FakeManager
46+
fakeNginxFileMgr *filefakes.FakeManager
4847
fakeNginxRuntimeMgr *runtimefakes.FakeManager
4948
fakeStatusUpdater *statusfakes.FakeUpdater
5049
)
@@ -55,9 +54,9 @@ var _ = Describe("EventHandler", func() {
5554
Expect(fakeGenerator.GenerateCallCount()).Should(Equal(1))
5655
Expect(fakeGenerator.GenerateArgsForCall(0)).Should(Equal(expectedConf))
5756

58-
Expect(fakeNginxFimeMgr.WriteHTTPServersConfigCallCount()).Should(Equal(1))
59-
name, cfg := fakeNginxFimeMgr.WriteHTTPServersConfigArgsForCall(0)
60-
Expect(name).Should(Equal("http-servers"))
57+
Expect(fakeNginxFileMgr.WriteHTTPConfigCallCount()).Should(Equal(1))
58+
name, cfg := fakeNginxFileMgr.WriteHTTPConfigArgsForCall(0)
59+
Expect(name).Should(Equal("http"))
6160
Expect(cfg).Should(Equal(expectedCfg))
6261

6362
Expect(fakeNginxRuntimeMgr.ReloadCallCount()).Should(Equal(1))
@@ -69,22 +68,20 @@ var _ = Describe("EventHandler", func() {
6968

7069
BeforeEach(func() {
7170
fakeProcessor = &statefakes.FakeChangeProcessor{}
72-
fakeServiceStore = &statefakes.FakeServiceStore{}
7371
fakeSecretMemoryManager = &statefakes.FakeSecretDiskMemoryManager{}
7472
fakeSecretStore = &statefakes.FakeSecretStore{}
7573
fakeGenerator = &configfakes.FakeGenerator{}
76-
fakeNginxFimeMgr = &filefakes.FakeManager{}
74+
fakeNginxFileMgr = &filefakes.FakeManager{}
7775
fakeNginxRuntimeMgr = &runtimefakes.FakeManager{}
7876
fakeStatusUpdater = &statusfakes.FakeUpdater{}
7977

8078
handler = events.NewEventHandlerImpl(events.EventHandlerConfig{
8179
Processor: fakeProcessor,
82-
ServiceStore: fakeServiceStore,
8380
SecretStore: fakeSecretStore,
8481
SecretMemoryManager: fakeSecretMemoryManager,
8582
Generator: fakeGenerator,
8683
Logger: zap.New(),
87-
NginxFileMgr: fakeNginxFimeMgr,
84+
NginxFileMgr: fakeNginxFileMgr,
8885
NginxRuntimeMgr: fakeNginxRuntimeMgr,
8986
StatusUpdater: fakeStatusUpdater,
9087
})
@@ -99,7 +96,7 @@ var _ = Describe("EventHandler", func() {
9996
fakeProcessor.ProcessReturns(changed, fakeConf, fakeStatuses)
10097

10198
fakeCfg := []byte("fake")
102-
fakeGenerator.GenerateReturns(fakeCfg, config.Warnings{})
99+
fakeGenerator.GenerateReturns(fakeCfg)
103100

104101
batch := []interface{}{e}
105102

@@ -125,85 +122,58 @@ var _ = Describe("EventHandler", func() {
125122
Entry("HTTPRoute upsert", &events.UpsertEvent{Resource: &v1beta1.HTTPRoute{}}),
126123
Entry("Gateway upsert", &events.UpsertEvent{Resource: &v1beta1.Gateway{}}),
127124
Entry("GatewayClass upsert", &events.UpsertEvent{Resource: &v1beta1.GatewayClass{}}),
125+
Entry("Service upsert", &events.UpsertEvent{Resource: &apiv1.Service{}}),
126+
Entry("EndpointSlice upsert", &events.UpsertEvent{Resource: &discoveryV1.EndpointSlice{}}),
127+
128128
Entry("HTTPRoute delete", &events.DeleteEvent{Type: &v1beta1.HTTPRoute{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "route"}}),
129129
Entry("Gateway delete", &events.DeleteEvent{Type: &v1beta1.Gateway{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "gateway"}}),
130130
Entry("GatewayClass delete", &events.DeleteEvent{Type: &v1beta1.GatewayClass{}, NamespacedName: types.NamespacedName{Name: "class"}}),
131+
Entry("Service delete", &events.DeleteEvent{Type: &apiv1.Service{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "service"}}),
132+
Entry("EndpointSlice deleted", &events.DeleteEvent{Type: &discoveryV1.EndpointSlice{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "endpointslice"}}),
131133
)
132134
})
133135

134-
Describe("Process Kubernetes resources events", func() {
136+
Describe("Process Secret events", func() {
135137
expectNoReconfig := func() {
136138
Expect(fakeProcessor.ProcessCallCount()).Should(Equal(1))
137139
Expect(fakeGenerator.GenerateCallCount()).Should(Equal(0))
138-
Expect(fakeNginxFimeMgr.WriteHTTPServersConfigCallCount()).Should(Equal(0))
140+
Expect(fakeNginxFileMgr.WriteHTTPConfigCallCount()).Should(Equal(0))
139141
Expect(fakeNginxRuntimeMgr.ReloadCallCount()).Should(Equal(0))
140142
Expect(fakeStatusUpdater.UpdateCallCount()).Should(Equal(0))
141143
}
144+
It("should process upsert event", func() {
145+
secret := &apiv1.Secret{}
142146

143-
Describe("Process Service events", func() {
144-
It("should process upsert event", func() {
145-
svc := &apiv1.Service{}
146-
147-
batch := []interface{}{&events.UpsertEvent{
148-
Resource: svc,
149-
}}
150-
151-
handler.HandleEventBatch(context.TODO(), batch)
152-
153-
Expect(fakeServiceStore.UpsertCallCount()).Should(Equal(1))
154-
Expect(fakeServiceStore.UpsertArgsForCall(0)).Should(Equal(svc))
155-
156-
expectNoReconfig()
157-
})
158-
159-
It("should process delete event", func() {
160-
nsname := types.NamespacedName{Namespace: "test", Name: "service"}
161-
162-
batch := []interface{}{&events.DeleteEvent{
163-
NamespacedName: nsname,
164-
Type: &apiv1.Service{},
165-
}}
166-
167-
handler.HandleEventBatch(context.TODO(), batch)
168-
169-
Expect(fakeServiceStore.DeleteCallCount()).Should(Equal(1))
170-
Expect(fakeServiceStore.DeleteArgsForCall(0)).Should(Equal(nsname))
171-
172-
expectNoReconfig()
173-
})
174-
})
175-
176-
Describe("Process Secret events", func() {
177-
It("should process upsert event", func() {
178-
secret := &apiv1.Secret{}
179-
180-
batch := []interface{}{&events.UpsertEvent{
147+
batch := []interface{}{
148+
&events.UpsertEvent{
181149
Resource: secret,
182-
}}
150+
},
151+
}
183152

184-
handler.HandleEventBatch(context.TODO(), batch)
153+
handler.HandleEventBatch(context.TODO(), batch)
185154

186-
Expect(fakeSecretStore.UpsertCallCount()).Should(Equal(1))
187-
Expect(fakeSecretStore.UpsertArgsForCall(0)).Should(Equal(secret))
155+
Expect(fakeSecretStore.UpsertCallCount()).Should(Equal(1))
156+
Expect(fakeSecretStore.UpsertArgsForCall(0)).Should(Equal(secret))
188157

189-
expectNoReconfig()
190-
})
158+
expectNoReconfig()
159+
})
191160

192-
It("should process delete event", func() {
193-
nsname := types.NamespacedName{Namespace: "test", Name: "secret"}
161+
It("should process delete event", func() {
162+
nsname := types.NamespacedName{Namespace: "test", Name: "secret"}
194163

195-
batch := []interface{}{&events.DeleteEvent{
164+
batch := []interface{}{
165+
&events.DeleteEvent{
196166
NamespacedName: nsname,
197167
Type: &apiv1.Secret{},
198-
}}
168+
},
169+
}
199170

200-
handler.HandleEventBatch(context.TODO(), batch)
171+
handler.HandleEventBatch(context.TODO(), batch)
201172

202-
Expect(fakeSecretStore.DeleteCallCount()).Should(Equal(1))
203-
Expect(fakeSecretStore.DeleteArgsForCall(0)).Should(Equal(nsname))
173+
Expect(fakeSecretStore.DeleteCallCount()).Should(Equal(1))
174+
Expect(fakeSecretStore.DeleteArgsForCall(0)).Should(Equal(nsname))
204175

205-
expectNoReconfig()
206-
})
176+
expectNoReconfig()
207177
})
208178
})
209179

@@ -218,13 +188,15 @@ var _ = Describe("EventHandler", func() {
218188
&events.UpsertEvent{Resource: &v1beta1.Gateway{}},
219189
&events.UpsertEvent{Resource: &v1beta1.GatewayClass{}},
220190
&events.UpsertEvent{Resource: svc},
191+
&events.UpsertEvent{Resource: &discoveryV1.EndpointSlice{}},
221192
&events.UpsertEvent{Resource: secret},
222193
}
223194
deletes := []interface{}{
224195
&events.DeleteEvent{Type: &v1beta1.HTTPRoute{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "route"}},
225196
&events.DeleteEvent{Type: &v1beta1.Gateway{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "gateway"}},
226197
&events.DeleteEvent{Type: &v1beta1.GatewayClass{}, NamespacedName: types.NamespacedName{Name: "class"}},
227198
&events.DeleteEvent{Type: &apiv1.Service{}, NamespacedName: svcNsName},
199+
&events.DeleteEvent{Type: &discoveryV1.EndpointSlice{}, NamespacedName: types.NamespacedName{Namespace: "test", Name: "endpointslice"}},
228200
&events.DeleteEvent{Type: &apiv1.Secret{}, NamespacedName: secretNsName},
229201
}
230202

@@ -238,34 +210,27 @@ var _ = Describe("EventHandler", func() {
238210
fakeProcessor.ProcessReturns(changed, fakeConf, fakeStatuses)
239211

240212
fakeCfg := []byte("fake")
241-
fakeGenerator.GenerateReturns(fakeCfg, config.Warnings{})
213+
fakeGenerator.GenerateReturns(fakeCfg)
242214

243215
handler.HandleEventBatch(context.TODO(), batch)
244216

245217
// Check that the events for Gateway API resources were captured
246218

247-
// 3, not 5, because the last 2 do not result into CaptureUpsertChange() call
248-
Expect(fakeProcessor.CaptureUpsertChangeCallCount()).Should(Equal(3))
249-
for i := 0; i < 3; i++ {
219+
// 5, not 6, because secret upsert events do not result into CaptureUpsertChange() call
220+
Expect(fakeProcessor.CaptureUpsertChangeCallCount()).Should(Equal(5))
221+
for i := 0; i < 5; i++ {
250222
Expect(fakeProcessor.CaptureUpsertChangeArgsForCall(i)).Should(Equal(upserts[i].(*events.UpsertEvent).Resource))
251223
}
252-
Expect(fakeProcessor.CaptureDeleteChangeCallCount()).Should(Equal(3))
253224

254-
// 3, not 5, because the last 2 do not result into CaptureDeleteChange() call
255-
for i := 0; i < 3; i++ {
225+
// 5, not 6, because secret delete events do not result into CaptureDeleteChange() call
226+
Expect(fakeProcessor.CaptureDeleteChangeCallCount()).Should(Equal(5))
227+
for i := 0; i < 5; i++ {
256228
d := deletes[i].(*events.DeleteEvent)
257229
passedObj, passedNsName := fakeProcessor.CaptureDeleteChangeArgsForCall(i)
258230
Expect(passedObj).Should(Equal(d.Type))
259231
Expect(passedNsName).Should(Equal(d.NamespacedName))
260232
}
261233

262-
// Check Service-related expectations
263-
Expect(fakeServiceStore.UpsertCallCount()).Should(Equal(1))
264-
Expect(fakeServiceStore.UpsertArgsForCall(0)).Should(Equal(svc))
265-
266-
Expect(fakeServiceStore.DeleteCallCount()).Should(Equal(1))
267-
Expect(fakeServiceStore.DeleteArgsForCall(0)).Should(Equal(svcNsName))
268-
269234
// Check Secret-related expectations
270235
Expect(fakeSecretStore.UpsertCallCount()).Should(Equal(1))
271236
Expect(fakeSecretStore.UpsertArgsForCall(0)).Should(Equal(secret))

internal/helpers/helpers.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,8 @@ func GetQueryParamMatchTypePointer(t v1beta1.QueryParamMatchType) *v1beta1.Query
5151
func GetTLSModePointer(t v1beta1.TLSModeType) *v1beta1.TLSModeType {
5252
return &t
5353
}
54+
55+
// GetBoolPointer takes a bool and returns a pointer to it. Useful in unit tests when initializing structs.
56+
func GetBoolPointer(b bool) *bool {
57+
return &b
58+
}

0 commit comments

Comments
 (0)