Skip to content

Commit d840429

Browse files
committed
Support service resolution (nginx#68)
This commit introduces ServiceStore component that stores services and supports resolving service namespace and name into its ClusterIP. In the future, the component will be extended to support Endpoints.
1 parent 7cf9450 commit d840429

File tree

11 files changed

+650
-3
lines changed

11 files changed

+650
-3
lines changed

deploy/manifests/nginx-gateway.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@ apiVersion: rbac.authorization.k8s.io/v1
1414
metadata:
1515
name: nginx-gateway
1616
rules:
17+
- apiGroups:
18+
- ""
19+
resources:
20+
- services
21+
verbs:
22+
- list
23+
- watch
1724
- apiGroups:
1825
- gateway.networking.k8s.io
1926
resources:

examples/cafe-example/cafe.yaml

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
apiVersion: apps/v1
2+
kind: Deployment
3+
metadata:
4+
name: coffee
5+
spec:
6+
replicas: 1
7+
selector:
8+
matchLabels:
9+
app: coffee
10+
template:
11+
metadata:
12+
labels:
13+
app: coffee
14+
spec:
15+
containers:
16+
- name: coffee
17+
image: nginxdemos/nginx-hello:plain-text
18+
ports:
19+
- containerPort: 8080
20+
---
21+
apiVersion: v1
22+
kind: Service
23+
metadata:
24+
name: coffee
25+
spec:
26+
ports:
27+
- port: 80
28+
targetPort: 8080
29+
protocol: TCP
30+
name: http
31+
selector:
32+
app: coffee
33+
---
34+
apiVersion: apps/v1
35+
kind: Deployment
36+
metadata:
37+
name: tea
38+
spec:
39+
replicas: 1
40+
selector:
41+
matchLabels:
42+
app: tea
43+
template:
44+
metadata:
45+
labels:
46+
app: tea
47+
spec:
48+
containers:
49+
- name: tea
50+
image: nginxdemos/nginx-hello:plain-text
51+
ports:
52+
- containerPort: 8080
53+
---
54+
apiVersion: v1
55+
kind: Service
56+
metadata:
57+
name: tea
58+
labels:
59+
spec:
60+
ports:
61+
- port: 80
62+
targetPort: 8080
63+
protocol: TCP
64+
name: http
65+
selector:
66+
app: tea

internal/events/loop.go

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,26 @@ import (
77
"github.com/go-logr/logr"
88
"github.com/nginxinc/nginx-gateway-kubernetes/internal/state"
99
"github.com/nginxinc/nginx-gateway-kubernetes/internal/status"
10+
apiv1 "k8s.io/api/core/v1"
11+
"k8s.io/apimachinery/pkg/types"
1012
"sigs.k8s.io/gateway-api/apis/v1alpha2"
1113
)
1214

1315
// EventLoop is the main event loop of the Gateway.
1416
type EventLoop struct {
1517
conf state.Configuration
18+
serviceStore state.ServiceStore
1619
eventCh <-chan interface{}
1720
logger logr.Logger
1821
statusUpdater status.Updater
1922
}
2023

2124
// NewEventLoop creates a new EventLoop.
22-
func NewEventLoop(conf state.Configuration, eventCh <-chan interface{}, statusUpdater status.Updater, logger logr.Logger) *EventLoop {
25+
func NewEventLoop(conf state.Configuration, serviceStore state.ServiceStore, eventCh <-chan interface{},
26+
statusUpdater status.Updater, logger logr.Logger) *EventLoop {
2327
return &EventLoop{
2428
conf: conf,
29+
serviceStore: serviceStore,
2530
eventCh: eventCh,
2631
statusUpdater: statusUpdater,
2732
logger: logger.WithName("eventLoop"),
@@ -75,8 +80,13 @@ func (el *EventLoop) propagateUpsert(e *UpsertEvent) ([]state.Change, []state.St
7580
case *v1alpha2.HTTPRoute:
7681
changes, statusUpdates := el.conf.UpsertHTTPRoute(r)
7782
return changes, statusUpdates, nil
83+
case *apiv1.Service:
84+
el.serviceStore.Upsert(r)
85+
// TO-DO: make sure the affected hosts are updated
86+
return nil, nil, nil
7887
}
7988

89+
// TO-DO: panic
8090
return nil, nil, fmt.Errorf("unknown resource type %T", e.Resource)
8191
}
8292

@@ -85,8 +95,13 @@ func (el *EventLoop) propagateDelete(e *DeleteEvent) ([]state.Change, []state.St
8595
case *v1alpha2.HTTPRoute:
8696
changes, statusUpdates := el.conf.DeleteHTTPRoute(e.NamespacedName)
8797
return changes, statusUpdates, nil
98+
case *apiv1.Service:
99+
el.serviceStore.Delete(e.NamespacedName)
100+
// TO-DO: make sure the affected hosts are updated
101+
return nil, nil, nil
88102
}
89103

104+
// TO-DO: panic
90105
return nil, nil, fmt.Errorf("unknown resource type %T", e.Type)
91106
}
92107

@@ -97,6 +112,43 @@ func (el *EventLoop) processChangesAndStatusUpdates(ctx context.Context, changes
97112

98113
// TO-DO: This code is temporary. We will remove it once we have a component that processes changes.
99114
fmt.Printf("%+v\n", c)
115+
116+
if c.Op == state.Upsert {
117+
// The code below resolves service backend refs into their cluster IPs
118+
// TO-DO: this code will be removed once we have the component that generates NGINX config and
119+
// uses the ServiceStore to resolve services.
120+
for _, g := range c.Host.PathRouteGroups {
121+
for _, r := range g.Routes {
122+
for _, b := range r.Source.Spec.Rules[r.RuleIdx].BackendRefs {
123+
if b.BackendRef.Kind == nil || *b.BackendRef.Kind == "Service" {
124+
ns := r.Source.Namespace
125+
if b.BackendRef.Namespace != nil {
126+
ns = string(*b.BackendRef.Namespace)
127+
}
128+
129+
address, err := el.serviceStore.Resolve(types.NamespacedName{
130+
Namespace: ns,
131+
Name: string(b.BackendRef.Name),
132+
})
133+
134+
if err != nil {
135+
fmt.Printf("Service %s/%s error: %v\n", ns, b.BackendRef.Name, err)
136+
continue
137+
}
138+
139+
var port int32 = 80
140+
if b.BackendRef.Port != nil {
141+
port = int32(*b.BackendRef.Port)
142+
}
143+
144+
address = fmt.Sprintf("%s:%d", address, port)
145+
146+
fmt.Printf("Service %s/%s: %s\n", ns, b.BackendRef.Name, address)
147+
}
148+
}
149+
}
150+
}
151+
}
100152
}
101153

102154
el.statusUpdater.ProcessStatusUpdates(ctx, updates)

internal/events/loop_test.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/nginxinc/nginx-gateway-kubernetes/internal/status/statusfakes"
1010
. "github.com/onsi/ginkgo/v2"
1111
. "github.com/onsi/gomega"
12+
apiv1 "k8s.io/api/core/v1"
1213
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1314
"k8s.io/apimachinery/pkg/runtime"
1415
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -33,6 +34,7 @@ var _ = Describe("EventLoop", func() {
3334
var ctrl *events.EventLoop
3435
var fakeConf *statefakes.FakeConfiguration
3536
var fakeUpdater *statusfakes.FakeUpdater
37+
var fakeServiceStore *statefakes.FakeServiceStore
3638
var cancel context.CancelFunc
3739
var eventCh chan interface{}
3840
var errorCh chan error
@@ -41,7 +43,8 @@ var _ = Describe("EventLoop", func() {
4143
fakeConf = &statefakes.FakeConfiguration{}
4244
eventCh = make(chan interface{})
4345
fakeUpdater = &statusfakes.FakeUpdater{}
44-
ctrl = events.NewEventLoop(fakeConf, eventCh, fakeUpdater, zap.New())
46+
fakeServiceStore = &statefakes.FakeServiceStore{}
47+
ctrl = events.NewEventLoop(fakeConf, fakeServiceStore, eventCh, fakeUpdater, zap.New())
4548

4649
var ctx context.Context
4750

@@ -122,6 +125,43 @@ var _ = Describe("EventLoop", func() {
122125
})
123126
})
124127

128+
Describe("Process Service events", func() {
129+
AfterEach(func() {
130+
cancel()
131+
132+
var err error
133+
Eventually(errorCh).Should(Receive(&err))
134+
Expect(err).To(BeNil())
135+
})
136+
137+
It("should process upsert event", func() {
138+
svc := &apiv1.Service{}
139+
140+
eventCh <- &events.UpsertEvent{
141+
Resource: svc,
142+
}
143+
144+
Eventually(fakeServiceStore.UpsertCallCount).Should(Equal(1))
145+
Eventually(func() *apiv1.Service {
146+
return fakeServiceStore.UpsertArgsForCall(0)
147+
}).Should(Equal(svc))
148+
})
149+
150+
It("should process delete event", func() {
151+
nsname := types.NamespacedName{Namespace: "test", Name: "service"}
152+
153+
eventCh <- &events.DeleteEvent{
154+
NamespacedName: nsname,
155+
Type: &apiv1.Service{},
156+
}
157+
158+
Eventually(fakeServiceStore.DeleteCallCount).Should(Equal(1))
159+
Eventually(func() types.NamespacedName {
160+
return fakeServiceStore.DeleteArgsForCall(0)
161+
}).Should(Equal(nsname))
162+
})
163+
})
164+
125165
Describe("Edge cases", func() {
126166
AfterEach(func() {
127167
cancel()
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package service
2+
3+
import (
4+
"github.com/go-logr/logr"
5+
"github.com/nginxinc/nginx-gateway-kubernetes/internal/config"
6+
"github.com/nginxinc/nginx-gateway-kubernetes/internal/events"
7+
"github.com/nginxinc/nginx-gateway-kubernetes/pkg/sdk"
8+
apiv1 "k8s.io/api/core/v1"
9+
"k8s.io/apimachinery/pkg/types"
10+
)
11+
12+
type serviceImplementation struct {
13+
conf config.Config
14+
eventCh chan<- interface{}
15+
}
16+
17+
// TO-DO: serviceImplementation looks similar to httpRouteImplemenation
18+
// consider if it is possible to reduce the amount of code.
19+
20+
// NewServiceImplementation creates a new ServiceImplementation.
21+
func NewServiceImplementation(cfg config.Config, eventCh chan<- interface{}) sdk.ServiceImpl {
22+
return &serviceImplementation{
23+
conf: cfg,
24+
eventCh: eventCh,
25+
}
26+
}
27+
28+
func (impl *serviceImplementation) Logger() logr.Logger {
29+
return impl.conf.Logger
30+
}
31+
32+
func (impl *serviceImplementation) Upsert(svc *apiv1.Service) {
33+
impl.Logger().Info("Service was upserted",
34+
"namespace", svc.Namespace, "name", svc.Name,
35+
)
36+
37+
impl.eventCh <- &events.UpsertEvent{
38+
Resource: svc,
39+
}
40+
}
41+
42+
func (impl *serviceImplementation) Remove(nsname types.NamespacedName) {
43+
impl.Logger().Info("Service resource was removed",
44+
"namespace", nsname.Namespace, "name", nsname.Name,
45+
)
46+
47+
impl.eventCh <- &events.DeleteEvent{
48+
NamespacedName: nsname,
49+
Type: &apiv1.Service{},
50+
}
51+
}

internal/manager/manager.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ import (
1010
gc "github.com/nginxinc/nginx-gateway-kubernetes/internal/implementations/gatewayclass"
1111
gcfg "github.com/nginxinc/nginx-gateway-kubernetes/internal/implementations/gatewayconfig"
1212
hr "github.com/nginxinc/nginx-gateway-kubernetes/internal/implementations/httproute"
13+
svc "github.com/nginxinc/nginx-gateway-kubernetes/internal/implementations/service"
1314
"github.com/nginxinc/nginx-gateway-kubernetes/internal/state"
1415
"github.com/nginxinc/nginx-gateway-kubernetes/internal/status"
1516
nginxgwv1alpha1 "github.com/nginxinc/nginx-gateway-kubernetes/pkg/apis/gateway/v1alpha1"
1617
"github.com/nginxinc/nginx-gateway-kubernetes/pkg/sdk"
18+
apiv1 "k8s.io/api/core/v1"
1719

1820
"k8s.io/apimachinery/pkg/runtime"
1921
ctlr "sigs.k8s.io/controller-runtime"
@@ -27,8 +29,10 @@ const clusterTimeout = 10 * time.Second
2729
var scheme = runtime.NewScheme()
2830

2931
func init() {
32+
// TO-DO: handle errors returned by the calls bellow
3033
_ = gatewayv1alpha2.AddToScheme(scheme)
3134
_ = nginxgwv1alpha1.AddToScheme(scheme)
35+
_ = apiv1.AddToScheme(scheme)
3236
}
3337

3438
func Start(cfg config.Config) error {
@@ -64,10 +68,15 @@ func Start(cfg config.Config) error {
6468
if err != nil {
6569
return fmt.Errorf("cannot register httproute implementation: %w", err)
6670
}
71+
err = sdk.RegisterServiceController(mgr, svc.NewServiceImplementation(cfg, eventCh))
72+
if err != nil {
73+
return fmt.Errorf("cannot register service implementation: %w", err)
74+
}
6775

6876
conf := state.NewConfiguration(cfg.GatewayCtlrName, state.NewRealClock())
77+
serviceStore := state.NewServiceStore()
6978
reporter := status.NewUpdater(mgr.GetClient(), cfg.Logger)
70-
eventLoop := events.NewEventLoop(conf, eventCh, reporter, cfg.Logger)
79+
eventLoop := events.NewEventLoop(conf, serviceStore, eventCh, reporter, cfg.Logger)
7180

7281
err = mgr.Add(eventLoop)
7382
if err != nil {

internal/state/services.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package state
2+
3+
import (
4+
"fmt"
5+
6+
v1 "k8s.io/api/core/v1"
7+
"k8s.io/apimachinery/pkg/types"
8+
)
9+
10+
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . ServiceStore
11+
12+
// ServiceStore stores services and can be queried for the cluster IP of a service.
13+
type ServiceStore interface {
14+
// Upsert upserts the service into the store.
15+
Upsert(svc *v1.Service)
16+
// Delete deletes the service from the store.
17+
Delete(nsname types.NamespacedName)
18+
// Resolve returns the cluster IP the service specified by its namespace and name.
19+
// If the service doesn't have a cluster IP or it doesn't exist, resolve will return an error.
20+
// TO-DO: later, we will start using the Endpoints rather than cluster IPs.
21+
Resolve(nsname types.NamespacedName) (string, error)
22+
}
23+
24+
// NewServiceStore creates a new ServiceStore.
25+
func NewServiceStore() ServiceStore {
26+
return &serviceStoreImpl{
27+
services: make(map[string]*v1.Service),
28+
}
29+
}
30+
31+
type serviceStoreImpl struct {
32+
services map[string]*v1.Service
33+
}
34+
35+
func (s *serviceStoreImpl) Upsert(svc *v1.Service) {
36+
s.services[getResourceKey(&svc.ObjectMeta)] = svc
37+
}
38+
39+
func (s *serviceStoreImpl) Delete(nsname types.NamespacedName) {
40+
delete(s.services, nsname.String())
41+
}
42+
43+
func (s *serviceStoreImpl) Resolve(nsname types.NamespacedName) (string, error) {
44+
svc, exist := s.services[nsname.String()]
45+
if !exist {
46+
return "", fmt.Errorf("service %s doesn't exist", nsname.String())
47+
}
48+
49+
if svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" {
50+
return "", fmt.Errorf("service %s doesn't have ClusterIP", nsname.String())
51+
}
52+
53+
return svc.Spec.ClusterIP, nil
54+
}

0 commit comments

Comments
 (0)