Skip to content

Support service resolution #68

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions deploy/manifests/nginx-gateway.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: nginx-gateway
rules:
- apiGroups:
- ""
resources:
- services
verbs:
- list
- watch
- apiGroups:
- gateway.networking.k8s.io
resources:
Expand Down
66 changes: 66 additions & 0 deletions examples/cafe-example/cafe.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: coffee
spec:
replicas: 1
selector:
matchLabels:
app: coffee
template:
metadata:
labels:
app: coffee
spec:
containers:
- name: coffee
image: nginxdemos/nginx-hello:plain-text
ports:
- containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
name: coffee
spec:
ports:
- port: 80
targetPort: 8080
protocol: TCP
name: http
selector:
app: coffee
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: tea
spec:
replicas: 1
selector:
matchLabels:
app: tea
template:
metadata:
labels:
app: tea
spec:
containers:
- name: tea
image: nginxdemos/nginx-hello:plain-text
ports:
- containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
name: tea
labels:
spec:
ports:
- port: 80
targetPort: 8080
protocol: TCP
name: http
selector:
app: tea
54 changes: 53 additions & 1 deletion internal/events/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,26 @@ import (
"github.com/go-logr/logr"
"github.com/nginxinc/nginx-gateway-kubernetes/internal/state"
"github.com/nginxinc/nginx-gateway-kubernetes/internal/status"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/gateway-api/apis/v1alpha2"
)

// EventLoop is the main event loop of the Gateway.
type EventLoop struct {
conf state.Configuration
serviceStore state.ServiceStore
eventCh <-chan interface{}
logger logr.Logger
statusUpdater status.Updater
}

// NewEventLoop creates a new EventLoop.
func NewEventLoop(conf state.Configuration, eventCh <-chan interface{}, statusUpdater status.Updater, logger logr.Logger) *EventLoop {
func NewEventLoop(conf state.Configuration, serviceStore state.ServiceStore, eventCh <-chan interface{},
statusUpdater status.Updater, logger logr.Logger) *EventLoop {
return &EventLoop{
conf: conf,
serviceStore: serviceStore,
eventCh: eventCh,
statusUpdater: statusUpdater,
logger: logger.WithName("eventLoop"),
Expand Down Expand Up @@ -75,8 +80,13 @@ func (el *EventLoop) propagateUpsert(e *UpsertEvent) ([]state.Change, []state.St
case *v1alpha2.HTTPRoute:
changes, statusUpdates := el.conf.UpsertHTTPRoute(r)
return changes, statusUpdates, nil
case *apiv1.Service:
el.serviceStore.Upsert(r)
// TO-DO: make sure the affected hosts are updated
return nil, nil, nil
}

// TO-DO: panic
return nil, nil, fmt.Errorf("unknown resource type %T", e.Resource)
}

Expand All @@ -85,8 +95,13 @@ func (el *EventLoop) propagateDelete(e *DeleteEvent) ([]state.Change, []state.St
case *v1alpha2.HTTPRoute:
changes, statusUpdates := el.conf.DeleteHTTPRoute(e.NamespacedName)
return changes, statusUpdates, nil
case *apiv1.Service:
el.serviceStore.Delete(e.NamespacedName)
// TO-DO: make sure the affected hosts are updated
return nil, nil, nil
}

// TO-DO: panic
return nil, nil, fmt.Errorf("unknown resource type %T", e.Type)
}

Expand All @@ -97,6 +112,43 @@ func (el *EventLoop) processChangesAndStatusUpdates(ctx context.Context, changes

// TO-DO: This code is temporary. We will remove it once we have a component that processes changes.
fmt.Printf("%+v\n", c)

if c.Op == state.Upsert {
// The code below resolves service backend refs into their cluster IPs
// TO-DO: this code will be removed once we have the component that generates NGINX config and
// uses the ServiceStore to resolve services.
for _, g := range c.Host.PathRouteGroups {
for _, r := range g.Routes {
for _, b := range r.Source.Spec.Rules[r.RuleIdx].BackendRefs {
if b.BackendRef.Kind == nil || *b.BackendRef.Kind == "Service" {
ns := r.Source.Namespace
if b.BackendRef.Namespace != nil {
ns = string(*b.BackendRef.Namespace)
}

address, err := el.serviceStore.Resolve(types.NamespacedName{
Namespace: ns,
Name: string(b.BackendRef.Name),
})

if err != nil {
fmt.Printf("Service %s/%s error: %v\n", ns, b.BackendRef.Name, err)
continue
}

var port int32 = 80
if b.BackendRef.Port != nil {
port = int32(*b.BackendRef.Port)
}

address = fmt.Sprintf("%s:%d", address, port)

fmt.Printf("Service %s/%s: %s\n", ns, b.BackendRef.Name, address)
}
}
}
}
}
}

el.statusUpdater.ProcessStatusUpdates(ctx, updates)
Expand Down
42 changes: 41 additions & 1 deletion internal/events/loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/nginxinc/nginx-gateway-kubernetes/internal/status/statusfakes"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -33,6 +34,7 @@ var _ = Describe("EventLoop", func() {
var ctrl *events.EventLoop
var fakeConf *statefakes.FakeConfiguration
var fakeUpdater *statusfakes.FakeUpdater
var fakeServiceStore *statefakes.FakeServiceStore
var cancel context.CancelFunc
var eventCh chan interface{}
var errorCh chan error
Expand All @@ -41,7 +43,8 @@ var _ = Describe("EventLoop", func() {
fakeConf = &statefakes.FakeConfiguration{}
eventCh = make(chan interface{})
fakeUpdater = &statusfakes.FakeUpdater{}
ctrl = events.NewEventLoop(fakeConf, eventCh, fakeUpdater, zap.New())
fakeServiceStore = &statefakes.FakeServiceStore{}
ctrl = events.NewEventLoop(fakeConf, fakeServiceStore, eventCh, fakeUpdater, zap.New())

var ctx context.Context

Expand Down Expand Up @@ -122,6 +125,43 @@ var _ = Describe("EventLoop", func() {
})
})

Describe("Process Service events", func() {
AfterEach(func() {
cancel()

var err error
Eventually(errorCh).Should(Receive(&err))
Expect(err).To(BeNil())
})

It("should process upsert event", func() {
svc := &apiv1.Service{}

eventCh <- &events.UpsertEvent{
Resource: svc,
}

Eventually(fakeServiceStore.UpsertCallCount).Should(Equal(1))
Eventually(func() *apiv1.Service {
return fakeServiceStore.UpsertArgsForCall(0)
}).Should(Equal(svc))
})

It("should process delete event", func() {
nsname := types.NamespacedName{Namespace: "test", Name: "service"}

eventCh <- &events.DeleteEvent{
NamespacedName: nsname,
Type: &apiv1.Service{},
}

Eventually(fakeServiceStore.DeleteCallCount).Should(Equal(1))
Eventually(func() types.NamespacedName {
return fakeServiceStore.DeleteArgsForCall(0)
}).Should(Equal(nsname))
})
})

Describe("Edge cases", func() {
AfterEach(func() {
cancel()
Expand Down
51 changes: 51 additions & 0 deletions internal/implementations/service/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package service

import (
"github.com/go-logr/logr"
"github.com/nginxinc/nginx-gateway-kubernetes/internal/config"
"github.com/nginxinc/nginx-gateway-kubernetes/internal/events"
"github.com/nginxinc/nginx-gateway-kubernetes/pkg/sdk"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
)

type serviceImplementation struct {
conf config.Config
eventCh chan<- interface{}
}

// TO-DO: serviceImplementation looks similar to httpRouteImplemenation
// consider if it is possible to reduce the amount of code.

// NewServiceImplementation creates a new ServiceImplementation.
func NewServiceImplementation(cfg config.Config, eventCh chan<- interface{}) sdk.ServiceImpl {
return &serviceImplementation{
conf: cfg,
eventCh: eventCh,
}
}

func (impl *serviceImplementation) Logger() logr.Logger {
return impl.conf.Logger
}

func (impl *serviceImplementation) Upsert(svc *apiv1.Service) {
impl.Logger().Info("Service was upserted",
"namespace", svc.Namespace, "name", svc.Name,
)

impl.eventCh <- &events.UpsertEvent{
Resource: svc,
}
}

func (impl *serviceImplementation) Remove(nsname types.NamespacedName) {
impl.Logger().Info("Service resource was removed",
"namespace", nsname.Namespace, "name", nsname.Name,
)

impl.eventCh <- &events.DeleteEvent{
NamespacedName: nsname,
Type: &apiv1.Service{},
}
}
11 changes: 10 additions & 1 deletion internal/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
gc "github.com/nginxinc/nginx-gateway-kubernetes/internal/implementations/gatewayclass"
gcfg "github.com/nginxinc/nginx-gateway-kubernetes/internal/implementations/gatewayconfig"
hr "github.com/nginxinc/nginx-gateway-kubernetes/internal/implementations/httproute"
svc "github.com/nginxinc/nginx-gateway-kubernetes/internal/implementations/service"
"github.com/nginxinc/nginx-gateway-kubernetes/internal/state"
"github.com/nginxinc/nginx-gateway-kubernetes/internal/status"
nginxgwv1alpha1 "github.com/nginxinc/nginx-gateway-kubernetes/pkg/apis/gateway/v1alpha1"
"github.com/nginxinc/nginx-gateway-kubernetes/pkg/sdk"
apiv1 "k8s.io/api/core/v1"

"k8s.io/apimachinery/pkg/runtime"
ctlr "sigs.k8s.io/controller-runtime"
Expand All @@ -27,8 +29,10 @@ const clusterTimeout = 10 * time.Second
var scheme = runtime.NewScheme()

func init() {
// TO-DO: handle errors returned by the calls bellow
_ = gatewayv1alpha2.AddToScheme(scheme)
_ = nginxgwv1alpha1.AddToScheme(scheme)
_ = apiv1.AddToScheme(scheme)
}

func Start(cfg config.Config) error {
Expand Down Expand Up @@ -64,10 +68,15 @@ func Start(cfg config.Config) error {
if err != nil {
return fmt.Errorf("cannot register httproute implementation: %w", err)
}
err = sdk.RegisterServiceController(mgr, svc.NewServiceImplementation(cfg, eventCh))
if err != nil {
return fmt.Errorf("cannot register service implementation: %w", err)
}

conf := state.NewConfiguration(cfg.GatewayCtlrName, state.NewRealClock())
serviceStore := state.NewServiceStore()
reporter := status.NewUpdater(mgr.GetClient(), cfg.Logger)
eventLoop := events.NewEventLoop(conf, eventCh, reporter, cfg.Logger)
eventLoop := events.NewEventLoop(conf, serviceStore, eventCh, reporter, cfg.Logger)

err = mgr.Add(eventLoop)
if err != nil {
Expand Down
54 changes: 54 additions & 0 deletions internal/state/services.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package state

import (
"fmt"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
)

//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . ServiceStore

// ServiceStore stores services and can be queried for the cluster IP of a service.
type ServiceStore interface {
// Upsert upserts the service into the store.
Upsert(svc *v1.Service)
// Delete deletes the service from the store.
Delete(nsname types.NamespacedName)
// Resolve returns the cluster IP the service specified by its namespace and name.
// If the service doesn't have a cluster IP or it doesn't exist, resolve will return an error.
// TO-DO: later, we will start using the Endpoints rather than cluster IPs.
Resolve(nsname types.NamespacedName) (string, error)
}

// NewServiceStore creates a new ServiceStore.
func NewServiceStore() ServiceStore {
return &serviceStoreImpl{
services: make(map[string]*v1.Service),
}
}

type serviceStoreImpl struct {
services map[string]*v1.Service
}

func (s *serviceStoreImpl) Upsert(svc *v1.Service) {
s.services[getResourceKey(&svc.ObjectMeta)] = svc
}

func (s *serviceStoreImpl) Delete(nsname types.NamespacedName) {
delete(s.services, nsname.String())
}

func (s *serviceStoreImpl) Resolve(nsname types.NamespacedName) (string, error) {
svc, exist := s.services[nsname.String()]
if !exist {
return "", fmt.Errorf("service %s doesn't exist", nsname.String())
}

if svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" {
return "", fmt.Errorf("service %s doesn't have ClusterIP", nsname.String())
}

return svc.Spec.ClusterIP, nil
}
Loading