From 835f7d5ca1afd6a79e1d767a75bc9782d970a6cf Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Fri, 11 Mar 2022 11:30:21 -0800 Subject: [PATCH 1/2] Support service resolution This commit introduces ServiceStore component that stores services and supports resolving service namespace and name into its ClusterIP. In the future, the component will be extended to support Endpoints --- deploy/manifests/nginx-gateway.yaml | 7 + examples/cafe-example/cafe.yaml | 66 ++++++ internal/events/loop.go | 53 ++++- internal/events/loop_test.go | 34 ++- internal/implementations/service/service.go | 51 +++++ internal/manager/manager.go | 11 +- internal/state/services.go | 54 +++++ internal/state/services_test.go | 104 ++++++++++ .../state/statefakes/fake_service_store.go | 196 ++++++++++++++++++ pkg/sdk/interfaces.go | 6 + pkg/sdk/service_controller.go | 62 ++++++ 11 files changed, 641 insertions(+), 3 deletions(-) create mode 100644 examples/cafe-example/cafe.yaml create mode 100644 internal/implementations/service/service.go create mode 100644 internal/state/services.go create mode 100644 internal/state/services_test.go create mode 100644 internal/state/statefakes/fake_service_store.go create mode 100644 pkg/sdk/service_controller.go diff --git a/deploy/manifests/nginx-gateway.yaml b/deploy/manifests/nginx-gateway.yaml index fa7a773b8e..defad3a0ff 100644 --- a/deploy/manifests/nginx-gateway.yaml +++ b/deploy/manifests/nginx-gateway.yaml @@ -14,6 +14,13 @@ apiVersion: rbac.authorization.k8s.io/v1 metadata: name: nginx-gateway rules: +- apiGroups: + - "" + resources: + - services + verbs: + - list + - watch - apiGroups: - gateway.networking.k8s.io resources: diff --git a/examples/cafe-example/cafe.yaml b/examples/cafe-example/cafe.yaml new file mode 100644 index 0000000000..d723ab0b7b --- /dev/null +++ b/examples/cafe-example/cafe.yaml @@ -0,0 +1,66 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: coffee +spec: + replicas: 1 + selector: + matchLabels: + app: coffee + template: + metadata: + labels: + app: coffee + spec: + containers: + - name: coffee + image: nginxdemos/nginx-hello:plain-text + ports: + - containerPort: 8080 +--- +apiVersion: v1 +kind: Service +metadata: + name: coffee +spec: + ports: + - port: 80 + targetPort: 8080 + protocol: TCP + name: http + selector: + app: coffee +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: tea +spec: + replicas: 1 + selector: + matchLabels: + app: tea + template: + metadata: + labels: + app: tea + spec: + containers: + - name: tea + image: nginxdemos/nginx-hello:plain-text + ports: + - containerPort: 8080 +--- +apiVersion: v1 +kind: Service +metadata: + name: tea + labels: +spec: + ports: + - port: 80 + targetPort: 8080 + protocol: TCP + name: http + selector: + app: tea diff --git a/internal/events/loop.go b/internal/events/loop.go index 4c9e71ebfd..e6fd6c5947 100644 --- a/internal/events/loop.go +++ b/internal/events/loop.go @@ -7,21 +7,26 @@ import ( "github.com/go-logr/logr" "github.com/nginxinc/nginx-gateway-kubernetes/internal/state" "github.com/nginxinc/nginx-gateway-kubernetes/internal/status" + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/gateway-api/apis/v1alpha2" ) // EventLoop is the main event loop of the Gateway. type EventLoop struct { conf state.Configuration + serviceStore state.ServiceStore eventCh <-chan interface{} logger logr.Logger statusUpdater status.Updater } // NewEventLoop creates a new EventLoop. -func NewEventLoop(conf state.Configuration, eventCh <-chan interface{}, statusUpdater status.Updater, logger logr.Logger) *EventLoop { +func NewEventLoop(conf state.Configuration, serviceStore state.ServiceStore, eventCh <-chan interface{}, + statusUpdater status.Updater, logger logr.Logger) *EventLoop { return &EventLoop{ conf: conf, + serviceStore: serviceStore, eventCh: eventCh, statusUpdater: statusUpdater, logger: logger.WithName("eventLoop"), @@ -75,8 +80,13 @@ func (el *EventLoop) propagateUpsert(e *UpsertEvent) ([]state.Change, []state.St case *v1alpha2.HTTPRoute: changes, statusUpdates := el.conf.UpsertHTTPRoute(r) return changes, statusUpdates, nil + case *apiv1.Service: + el.serviceStore.Upsert(r) + // TO-DO: make sure the affected hosts are updated + return nil, nil, nil } + // TO-DO: panic return nil, nil, fmt.Errorf("unknown resource type %T", e.Resource) } @@ -85,8 +95,12 @@ func (el *EventLoop) propagateDelete(e *DeleteEvent) ([]state.Change, []state.St case *v1alpha2.HTTPRoute: changes, statusUpdates := el.conf.DeleteHTTPRoute(e.NamespacedName) return changes, statusUpdates, nil + case *apiv1.Service: + el.serviceStore.Delete(e.NamespacedName) + // TO-DO: make sure the affected hosts are updated } + // TO-DO: panic return nil, nil, fmt.Errorf("unknown resource type %T", e.Type) } @@ -97,6 +111,43 @@ func (el *EventLoop) processChangesAndStatusUpdates(ctx context.Context, changes // TO-DO: This code is temporary. We will remove it once we have a component that processes changes. fmt.Printf("%+v\n", c) + + if c.Op == state.Upsert { + // The code below resolves service backend refs into their cluster IPs + // TO-DO: this code will be removed once we have the component that generates NGINX config and + // uses the ServiceStore to resolve services. + for _, g := range c.Host.PathRouteGroups { + for _, r := range g.Routes { + for _, b := range r.Source.Spec.Rules[r.RuleIdx].BackendRefs { + if b.BackendRef.Kind == nil || *b.BackendRef.Kind == "Service" { + ns := r.Source.Namespace + if b.BackendRef.Namespace != nil { + ns = string(*b.BackendRef.Namespace) + } + + address, err := el.serviceStore.Resolve(types.NamespacedName{ + Namespace: ns, + Name: string(b.BackendRef.Name), + }) + + if err != nil { + fmt.Printf("Service %s/%s error: %v\n", ns, b.BackendRef.Name, err) + continue + } + + var port int32 = 80 + if b.BackendRef.Port != nil { + port = int32(*b.BackendRef.Port) + } + + address = fmt.Sprintf("%s:%d", address, port) + + fmt.Printf("Service %s/%s: %s\n", ns, b.BackendRef.Name, address) + } + } + } + } + } } el.statusUpdater.ProcessStatusUpdates(ctx, updates) diff --git a/internal/events/loop_test.go b/internal/events/loop_test.go index 24a7498865..e0cee345b3 100644 --- a/internal/events/loop_test.go +++ b/internal/events/loop_test.go @@ -9,6 +9,7 @@ import ( "github.com/nginxinc/nginx-gateway-kubernetes/internal/status/statusfakes" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -33,6 +34,7 @@ var _ = Describe("EventLoop", func() { var ctrl *events.EventLoop var fakeConf *statefakes.FakeConfiguration var fakeUpdater *statusfakes.FakeUpdater + var fakeServiceStore *statefakes.FakeServiceStore var cancel context.CancelFunc var eventCh chan interface{} var errorCh chan error @@ -41,7 +43,8 @@ var _ = Describe("EventLoop", func() { fakeConf = &statefakes.FakeConfiguration{} eventCh = make(chan interface{}) fakeUpdater = &statusfakes.FakeUpdater{} - ctrl = events.NewEventLoop(fakeConf, eventCh, fakeUpdater, zap.New()) + fakeServiceStore = &statefakes.FakeServiceStore{} + ctrl = events.NewEventLoop(fakeConf, fakeServiceStore, eventCh, fakeUpdater, zap.New()) var ctx context.Context @@ -122,6 +125,35 @@ var _ = Describe("EventLoop", func() { }) }) + Describe("Process Service events", func() { + It("should process upsert event", func() { + svc := &apiv1.Service{} + + eventCh <- &events.UpsertEvent{ + Resource: svc, + } + + Eventually(fakeServiceStore.UpsertCallCount()).Should(Equal(1)) + Eventually(func() *apiv1.Service { + return fakeServiceStore.UpsertArgsForCall(0) + }).Should(Equal(svc)) + }) + + It("should process delete event", func() { + nsname := types.NamespacedName{Namespace: "test", Name: "service"} + + eventCh <- &events.DeleteEvent{ + NamespacedName: nsname, + Type: &apiv1.Service{}, + } + + Eventually(fakeServiceStore.DeleteCallCount()).Should(Equal(1)) + Eventually(func() types.NamespacedName { + return fakeServiceStore.DeleteArgsForCall(0) + }).Should(Equal(nsname)) + }) + }) + Describe("Edge cases", func() { AfterEach(func() { cancel() diff --git a/internal/implementations/service/service.go b/internal/implementations/service/service.go new file mode 100644 index 0000000000..0bac6140c5 --- /dev/null +++ b/internal/implementations/service/service.go @@ -0,0 +1,51 @@ +package service + +import ( + "github.com/go-logr/logr" + "github.com/nginxinc/nginx-gateway-kubernetes/internal/config" + "github.com/nginxinc/nginx-gateway-kubernetes/internal/events" + "github.com/nginxinc/nginx-gateway-kubernetes/pkg/sdk" + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" +) + +type serviceImplementation struct { + conf config.Config + eventCh chan<- interface{} +} + +// TO-DO: serviceImplementation looks similar to httpRouteImplemenation +// consider if it is possible to reduce the amount of code. + +// NewServiceImplementation creates a new ServiceImplementation. +func NewServiceImplementation(cfg config.Config, eventCh chan<- interface{}) sdk.ServiceImpl { + return &serviceImplementation{ + conf: cfg, + eventCh: eventCh, + } +} + +func (impl *serviceImplementation) Logger() logr.Logger { + return impl.conf.Logger +} + +func (impl *serviceImplementation) Upsert(svc *apiv1.Service) { + impl.Logger().Info("Service was upserted", + "namespace", svc.Namespace, "name", svc.Name, + ) + + impl.eventCh <- &events.UpsertEvent{ + Resource: svc, + } +} + +func (impl *serviceImplementation) Remove(nsname types.NamespacedName) { + impl.Logger().Info("Service resource was removed", + "namespace", nsname.Namespace, "name", nsname.Name, + ) + + impl.eventCh <- &events.DeleteEvent{ + NamespacedName: nsname, + Type: &apiv1.Service{}, + } +} diff --git a/internal/manager/manager.go b/internal/manager/manager.go index a88df40d4d..67b435dc64 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -10,10 +10,12 @@ import ( gc "github.com/nginxinc/nginx-gateway-kubernetes/internal/implementations/gatewayclass" gcfg "github.com/nginxinc/nginx-gateway-kubernetes/internal/implementations/gatewayconfig" hr "github.com/nginxinc/nginx-gateway-kubernetes/internal/implementations/httproute" + svc "github.com/nginxinc/nginx-gateway-kubernetes/internal/implementations/service" "github.com/nginxinc/nginx-gateway-kubernetes/internal/state" "github.com/nginxinc/nginx-gateway-kubernetes/internal/status" nginxgwv1alpha1 "github.com/nginxinc/nginx-gateway-kubernetes/pkg/apis/gateway/v1alpha1" "github.com/nginxinc/nginx-gateway-kubernetes/pkg/sdk" + apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" ctlr "sigs.k8s.io/controller-runtime" @@ -27,8 +29,10 @@ const clusterTimeout = 10 * time.Second var scheme = runtime.NewScheme() func init() { + // TO-DO: handle errors returned by the calls bellow _ = gatewayv1alpha2.AddToScheme(scheme) _ = nginxgwv1alpha1.AddToScheme(scheme) + _ = apiv1.AddToScheme(scheme) } func Start(cfg config.Config) error { @@ -64,10 +68,15 @@ func Start(cfg config.Config) error { if err != nil { return fmt.Errorf("cannot register httproute implementation: %w", err) } + err = sdk.RegisterServiceController(mgr, svc.NewServiceImplementation(cfg, eventCh)) + if err != nil { + return fmt.Errorf("cannot register service implementation: %w", err) + } conf := state.NewConfiguration(cfg.GatewayCtlrName, state.NewRealClock()) + serviceStore := state.NewServiceStore() reporter := status.NewUpdater(mgr.GetClient(), cfg.Logger) - eventLoop := events.NewEventLoop(conf, eventCh, reporter, cfg.Logger) + eventLoop := events.NewEventLoop(conf, serviceStore, eventCh, reporter, cfg.Logger) err = mgr.Add(eventLoop) if err != nil { diff --git a/internal/state/services.go b/internal/state/services.go new file mode 100644 index 0000000000..4b811bd58d --- /dev/null +++ b/internal/state/services.go @@ -0,0 +1,54 @@ +package state + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" +) + +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . ServiceStore + +// ServiceStore stores services and can be queried for the cluster IP of a service. +type ServiceStore interface { + // Upsert upserts the service into the store. + Upsert(svc *v1.Service) + // Delete deletes the service from the store. + Delete(nsname types.NamespacedName) + // Resolve returns the cluster IP the service specified by its namespace and name. + // If the service doesn't have a cluster IP or it doesn't exist, resolve will return an error. + // TO-DO: later, we will start using the Endpoints rather than cluster IPs. + Resolve(nsname types.NamespacedName) (string, error) +} + +// NewServiceStore creates a new ServiceStore. +func NewServiceStore() ServiceStore { + return &serviceStoreImpl{ + services: make(map[string]*v1.Service), + } +} + +type serviceStoreImpl struct { + services map[string]*v1.Service +} + +func (s *serviceStoreImpl) Upsert(svc *v1.Service) { + s.services[getResourceKey(&svc.ObjectMeta)] = svc +} + +func (s *serviceStoreImpl) Delete(nsname types.NamespacedName) { + delete(s.services, nsname.String()) +} + +func (s *serviceStoreImpl) Resolve(nsname types.NamespacedName) (string, error) { + svc, exist := s.services[nsname.String()] + if !exist { + return "", fmt.Errorf("service %s doesn't exist", nsname.String()) + } + + if svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" { + return "", fmt.Errorf("service %s doesn't have ClusterIP", nsname.String()) + } + + return svc.Spec.ClusterIP, nil +} diff --git a/internal/state/services_test.go b/internal/state/services_test.go new file mode 100644 index 0000000000..6e0905c9a4 --- /dev/null +++ b/internal/state/services_test.go @@ -0,0 +1,104 @@ +package state + +import ( + . "github.com/onsi/ginkgo/v2" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + . "github.com/onsi/gomega" +) + +var _ = Describe("ServiceStore", func() { + var store ServiceStore + + BeforeEach(OncePerOrdered, func() { + store = NewServiceStore() + }) + + Describe("Resolve Service", Ordered, func() { + var svc *apiv1.Service + var svcUpdated *apiv1.Service + + BeforeAll(func() { + svc = &apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "service1", + }, + Spec: apiv1.ServiceSpec{ + ClusterIP: "10.0.0.1", + }, + } + + svcUpdated = svc.DeepCopy() + // In reality, ClusterIP cannot change for a regular service. + // However, it is ok to use this change to test the Upsert function. + svcUpdated.Spec.ClusterIP = "10.0.0.2" + }) + + It("should add a service", func() { + store.Upsert(svc) + }) + + It("should resolve the service", func() { + address, err := store.Resolve(types.NamespacedName{Namespace: "test", Name: "service1"}) + + Expect(address).To(Equal("10.0.0.1")) + Expect(err).To(BeNil()) + }) + + It("should update the service", func() { + store.Upsert(svcUpdated) + }) + + It("should resolve the updated service", func() { + address, err := store.Resolve(types.NamespacedName{Namespace: "test", Name: "service1"}) + + Expect(address).To(Equal("10.0.0.2")) + Expect(err).To(BeNil()) + + }) + + It("should delete the service", func() { + store.Delete(types.NamespacedName{Namespace: "test", Name: "service1"}) + }) + + It("should fail to resolve the service", func() { + _, err := store.Resolve(types.NamespacedName{Namespace: "test", Name: "service1"}) + + Expect(err).To(HaveOccurred()) + }) + }) + + Describe("Edge cases", func() { + BeforeEach(func() { + store.Upsert(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "empty-ip", + }, + }) + + store.Upsert(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "none-ip", + }, + Spec: apiv1.ServiceSpec{ + ClusterIP: "None", + }, + }) + }) + DescribeTable("Resolve returns error", + func(nsname types.NamespacedName) { + _, err := store.Resolve(nsname) + + Expect(err).To(HaveOccurred()) + }, + Entry("cluster ip is empty", types.NamespacedName{Namespace: "test", Name: "empty-ip"}), + Entry("cluster ip is none", types.NamespacedName{Namespace: "test", Name: "none-ip"}), + Entry("service doesn't exist", types.NamespacedName{Namespace: "test", Name: "service"}), + ) + }) +}) diff --git a/internal/state/statefakes/fake_service_store.go b/internal/state/statefakes/fake_service_store.go new file mode 100644 index 0000000000..9042fbfcd6 --- /dev/null +++ b/internal/state/statefakes/fake_service_store.go @@ -0,0 +1,196 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package statefakes + +import ( + "sync" + + "github.com/nginxinc/nginx-gateway-kubernetes/internal/state" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" +) + +type FakeServiceStore struct { + DeleteStub func(types.NamespacedName) + deleteMutex sync.RWMutex + deleteArgsForCall []struct { + arg1 types.NamespacedName + } + ResolveStub func(types.NamespacedName) (string, error) + resolveMutex sync.RWMutex + resolveArgsForCall []struct { + arg1 types.NamespacedName + } + resolveReturns struct { + result1 string + result2 error + } + resolveReturnsOnCall map[int]struct { + result1 string + result2 error + } + UpsertStub func(*v1.Service) + upsertMutex sync.RWMutex + upsertArgsForCall []struct { + arg1 *v1.Service + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeServiceStore) Delete(arg1 types.NamespacedName) { + fake.deleteMutex.Lock() + fake.deleteArgsForCall = append(fake.deleteArgsForCall, struct { + arg1 types.NamespacedName + }{arg1}) + stub := fake.DeleteStub + fake.recordInvocation("Delete", []interface{}{arg1}) + fake.deleteMutex.Unlock() + if stub != nil { + fake.DeleteStub(arg1) + } +} + +func (fake *FakeServiceStore) DeleteCallCount() int { + fake.deleteMutex.RLock() + defer fake.deleteMutex.RUnlock() + return len(fake.deleteArgsForCall) +} + +func (fake *FakeServiceStore) DeleteCalls(stub func(types.NamespacedName)) { + fake.deleteMutex.Lock() + defer fake.deleteMutex.Unlock() + fake.DeleteStub = stub +} + +func (fake *FakeServiceStore) DeleteArgsForCall(i int) types.NamespacedName { + fake.deleteMutex.RLock() + defer fake.deleteMutex.RUnlock() + argsForCall := fake.deleteArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeServiceStore) Resolve(arg1 types.NamespacedName) (string, error) { + fake.resolveMutex.Lock() + ret, specificReturn := fake.resolveReturnsOnCall[len(fake.resolveArgsForCall)] + fake.resolveArgsForCall = append(fake.resolveArgsForCall, struct { + arg1 types.NamespacedName + }{arg1}) + stub := fake.ResolveStub + fakeReturns := fake.resolveReturns + fake.recordInvocation("Resolve", []interface{}{arg1}) + fake.resolveMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeServiceStore) ResolveCallCount() int { + fake.resolveMutex.RLock() + defer fake.resolveMutex.RUnlock() + return len(fake.resolveArgsForCall) +} + +func (fake *FakeServiceStore) ResolveCalls(stub func(types.NamespacedName) (string, error)) { + fake.resolveMutex.Lock() + defer fake.resolveMutex.Unlock() + fake.ResolveStub = stub +} + +func (fake *FakeServiceStore) ResolveArgsForCall(i int) types.NamespacedName { + fake.resolveMutex.RLock() + defer fake.resolveMutex.RUnlock() + argsForCall := fake.resolveArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeServiceStore) ResolveReturns(result1 string, result2 error) { + fake.resolveMutex.Lock() + defer fake.resolveMutex.Unlock() + fake.ResolveStub = nil + fake.resolveReturns = struct { + result1 string + result2 error + }{result1, result2} +} + +func (fake *FakeServiceStore) ResolveReturnsOnCall(i int, result1 string, result2 error) { + fake.resolveMutex.Lock() + defer fake.resolveMutex.Unlock() + fake.ResolveStub = nil + if fake.resolveReturnsOnCall == nil { + fake.resolveReturnsOnCall = make(map[int]struct { + result1 string + result2 error + }) + } + fake.resolveReturnsOnCall[i] = struct { + result1 string + result2 error + }{result1, result2} +} + +func (fake *FakeServiceStore) Upsert(arg1 *v1.Service) { + fake.upsertMutex.Lock() + fake.upsertArgsForCall = append(fake.upsertArgsForCall, struct { + arg1 *v1.Service + }{arg1}) + stub := fake.UpsertStub + fake.recordInvocation("Upsert", []interface{}{arg1}) + fake.upsertMutex.Unlock() + if stub != nil { + fake.UpsertStub(arg1) + } +} + +func (fake *FakeServiceStore) UpsertCallCount() int { + fake.upsertMutex.RLock() + defer fake.upsertMutex.RUnlock() + return len(fake.upsertArgsForCall) +} + +func (fake *FakeServiceStore) UpsertCalls(stub func(*v1.Service)) { + fake.upsertMutex.Lock() + defer fake.upsertMutex.Unlock() + fake.UpsertStub = stub +} + +func (fake *FakeServiceStore) UpsertArgsForCall(i int) *v1.Service { + fake.upsertMutex.RLock() + defer fake.upsertMutex.RUnlock() + argsForCall := fake.upsertArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeServiceStore) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.deleteMutex.RLock() + defer fake.deleteMutex.RUnlock() + fake.resolveMutex.RLock() + defer fake.resolveMutex.RUnlock() + fake.upsertMutex.RLock() + defer fake.upsertMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeServiceStore) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ state.ServiceStore = new(FakeServiceStore) diff --git a/pkg/sdk/interfaces.go b/pkg/sdk/interfaces.go index 0dcba68f30..b588778555 100644 --- a/pkg/sdk/interfaces.go +++ b/pkg/sdk/interfaces.go @@ -2,6 +2,7 @@ package sdk import ( nginxgwv1alpha1 "github.com/nginxinc/nginx-gateway-kubernetes/pkg/apis/gateway/v1alpha1" + apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/gateway-api/apis/v1alpha2" ) @@ -26,3 +27,8 @@ type HTTPRouteImpl interface { // TO-DO: change other interfaces to use types.NamespacedName Remove(types.NamespacedName) } + +type ServiceImpl interface { + Upsert(svc *apiv1.Service) + Remove(name types.NamespacedName) +} diff --git a/pkg/sdk/service_controller.go b/pkg/sdk/service_controller.go new file mode 100644 index 0000000000..3d28a25648 --- /dev/null +++ b/pkg/sdk/service_controller.go @@ -0,0 +1,62 @@ +package sdk + +import ( + "context" + + apiv1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + ctlr "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +type serviceReconciler struct { + client.Client + scheme *runtime.Scheme + impl ServiceImpl +} + +// RegisterServiceController registers the ServiceController in the manager. +func RegisterServiceController(mgr manager.Manager, impl ServiceImpl) error { + r := &serviceReconciler{ + Client: mgr.GetClient(), + scheme: mgr.GetScheme(), + impl: impl, + } + + return ctlr.NewControllerManagedBy(mgr). + For(&apiv1.Service{}). + Complete(r) +} + +func (r *serviceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + log := log.FromContext(ctx).WithValues("service", req.NamespacedName) + + log.V(3).Info("Reconciling Service") + + found := true + var svc apiv1.Service + err := r.Get(ctx, req.NamespacedName, &svc) + if err != nil { + if !apierrors.IsNotFound(err) { + log.Error(err, "Failed to get Service") + return reconcile.Result{}, err + } + found = false + } + + if !found { + log.V(3).Info("Removing Service") + + r.impl.Remove(req.NamespacedName) + return reconcile.Result{}, nil + } + + log.V(3).Info("Upserting Service") + + r.impl.Upsert(&svc) + return reconcile.Result{}, nil +} From 0610af62746222b05a5cf069338d6a892208fdae Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Fri, 11 Mar 2022 15:55:00 -0800 Subject: [PATCH 2/2] Fix a bug and race cond in test --- internal/events/loop.go | 1 + internal/events/loop_test.go | 12 ++++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/internal/events/loop.go b/internal/events/loop.go index e6fd6c5947..0c4991b3ef 100644 --- a/internal/events/loop.go +++ b/internal/events/loop.go @@ -98,6 +98,7 @@ func (el *EventLoop) propagateDelete(e *DeleteEvent) ([]state.Change, []state.St case *apiv1.Service: el.serviceStore.Delete(e.NamespacedName) // TO-DO: make sure the affected hosts are updated + return nil, nil, nil } // TO-DO: panic diff --git a/internal/events/loop_test.go b/internal/events/loop_test.go index e0cee345b3..c1dc6514af 100644 --- a/internal/events/loop_test.go +++ b/internal/events/loop_test.go @@ -126,6 +126,14 @@ var _ = Describe("EventLoop", func() { }) Describe("Process Service events", func() { + AfterEach(func() { + cancel() + + var err error + Eventually(errorCh).Should(Receive(&err)) + Expect(err).To(BeNil()) + }) + It("should process upsert event", func() { svc := &apiv1.Service{} @@ -133,7 +141,7 @@ var _ = Describe("EventLoop", func() { Resource: svc, } - Eventually(fakeServiceStore.UpsertCallCount()).Should(Equal(1)) + Eventually(fakeServiceStore.UpsertCallCount).Should(Equal(1)) Eventually(func() *apiv1.Service { return fakeServiceStore.UpsertArgsForCall(0) }).Should(Equal(svc)) @@ -147,7 +155,7 @@ var _ = Describe("EventLoop", func() { Type: &apiv1.Service{}, } - Eventually(fakeServiceStore.DeleteCallCount()).Should(Equal(1)) + Eventually(fakeServiceStore.DeleteCallCount).Should(Equal(1)) Eventually(func() types.NamespacedName { return fakeServiceStore.DeleteArgsForCall(0) }).Should(Equal(nsname))