diff --git a/deploy/manifests/nginx-gateway.yaml b/deploy/manifests/nginx-gateway.yaml index fa7a773b8e..defad3a0ff 100644 --- a/deploy/manifests/nginx-gateway.yaml +++ b/deploy/manifests/nginx-gateway.yaml @@ -14,6 +14,13 @@ apiVersion: rbac.authorization.k8s.io/v1 metadata: name: nginx-gateway rules: +- apiGroups: + - "" + resources: + - services + verbs: + - list + - watch - apiGroups: - gateway.networking.k8s.io resources: diff --git a/examples/cafe-example/cafe.yaml b/examples/cafe-example/cafe.yaml new file mode 100644 index 0000000000..d723ab0b7b --- /dev/null +++ b/examples/cafe-example/cafe.yaml @@ -0,0 +1,66 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: coffee +spec: + replicas: 1 + selector: + matchLabels: + app: coffee + template: + metadata: + labels: + app: coffee + spec: + containers: + - name: coffee + image: nginxdemos/nginx-hello:plain-text + ports: + - containerPort: 8080 +--- +apiVersion: v1 +kind: Service +metadata: + name: coffee +spec: + ports: + - port: 80 + targetPort: 8080 + protocol: TCP + name: http + selector: + app: coffee +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: tea +spec: + replicas: 1 + selector: + matchLabels: + app: tea + template: + metadata: + labels: + app: tea + spec: + containers: + - name: tea + image: nginxdemos/nginx-hello:plain-text + ports: + - containerPort: 8080 +--- +apiVersion: v1 +kind: Service +metadata: + name: tea + labels: +spec: + ports: + - port: 80 + targetPort: 8080 + protocol: TCP + name: http + selector: + app: tea diff --git a/internal/events/loop.go b/internal/events/loop.go index 4c9e71ebfd..0c4991b3ef 100644 --- a/internal/events/loop.go +++ b/internal/events/loop.go @@ -7,21 +7,26 @@ import ( "github.com/go-logr/logr" "github.com/nginxinc/nginx-gateway-kubernetes/internal/state" "github.com/nginxinc/nginx-gateway-kubernetes/internal/status" + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/gateway-api/apis/v1alpha2" ) // EventLoop is the main event loop of the Gateway. type EventLoop struct { conf state.Configuration + serviceStore state.ServiceStore eventCh <-chan interface{} logger logr.Logger statusUpdater status.Updater } // NewEventLoop creates a new EventLoop. -func NewEventLoop(conf state.Configuration, eventCh <-chan interface{}, statusUpdater status.Updater, logger logr.Logger) *EventLoop { +func NewEventLoop(conf state.Configuration, serviceStore state.ServiceStore, eventCh <-chan interface{}, + statusUpdater status.Updater, logger logr.Logger) *EventLoop { return &EventLoop{ conf: conf, + serviceStore: serviceStore, eventCh: eventCh, statusUpdater: statusUpdater, logger: logger.WithName("eventLoop"), @@ -75,8 +80,13 @@ func (el *EventLoop) propagateUpsert(e *UpsertEvent) ([]state.Change, []state.St case *v1alpha2.HTTPRoute: changes, statusUpdates := el.conf.UpsertHTTPRoute(r) return changes, statusUpdates, nil + case *apiv1.Service: + el.serviceStore.Upsert(r) + // TO-DO: make sure the affected hosts are updated + return nil, nil, nil } + // TO-DO: panic return nil, nil, fmt.Errorf("unknown resource type %T", e.Resource) } @@ -85,8 +95,13 @@ func (el *EventLoop) propagateDelete(e *DeleteEvent) ([]state.Change, []state.St case *v1alpha2.HTTPRoute: changes, statusUpdates := el.conf.DeleteHTTPRoute(e.NamespacedName) return changes, statusUpdates, nil + case *apiv1.Service: + el.serviceStore.Delete(e.NamespacedName) + // TO-DO: make sure the affected hosts are updated + return nil, nil, nil } + // TO-DO: panic return nil, nil, fmt.Errorf("unknown resource type %T", e.Type) } @@ -97,6 +112,43 @@ func (el *EventLoop) processChangesAndStatusUpdates(ctx context.Context, changes // TO-DO: This code is temporary. We will remove it once we have a component that processes changes. fmt.Printf("%+v\n", c) + + if c.Op == state.Upsert { + // The code below resolves service backend refs into their cluster IPs + // TO-DO: this code will be removed once we have the component that generates NGINX config and + // uses the ServiceStore to resolve services. + for _, g := range c.Host.PathRouteGroups { + for _, r := range g.Routes { + for _, b := range r.Source.Spec.Rules[r.RuleIdx].BackendRefs { + if b.BackendRef.Kind == nil || *b.BackendRef.Kind == "Service" { + ns := r.Source.Namespace + if b.BackendRef.Namespace != nil { + ns = string(*b.BackendRef.Namespace) + } + + address, err := el.serviceStore.Resolve(types.NamespacedName{ + Namespace: ns, + Name: string(b.BackendRef.Name), + }) + + if err != nil { + fmt.Printf("Service %s/%s error: %v\n", ns, b.BackendRef.Name, err) + continue + } + + var port int32 = 80 + if b.BackendRef.Port != nil { + port = int32(*b.BackendRef.Port) + } + + address = fmt.Sprintf("%s:%d", address, port) + + fmt.Printf("Service %s/%s: %s\n", ns, b.BackendRef.Name, address) + } + } + } + } + } } el.statusUpdater.ProcessStatusUpdates(ctx, updates) diff --git a/internal/events/loop_test.go b/internal/events/loop_test.go index 24a7498865..c1dc6514af 100644 --- a/internal/events/loop_test.go +++ b/internal/events/loop_test.go @@ -9,6 +9,7 @@ import ( "github.com/nginxinc/nginx-gateway-kubernetes/internal/status/statusfakes" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -33,6 +34,7 @@ var _ = Describe("EventLoop", func() { var ctrl *events.EventLoop var fakeConf *statefakes.FakeConfiguration var fakeUpdater *statusfakes.FakeUpdater + var fakeServiceStore *statefakes.FakeServiceStore var cancel context.CancelFunc var eventCh chan interface{} var errorCh chan error @@ -41,7 +43,8 @@ var _ = Describe("EventLoop", func() { fakeConf = &statefakes.FakeConfiguration{} eventCh = make(chan interface{}) fakeUpdater = &statusfakes.FakeUpdater{} - ctrl = events.NewEventLoop(fakeConf, eventCh, fakeUpdater, zap.New()) + fakeServiceStore = &statefakes.FakeServiceStore{} + ctrl = events.NewEventLoop(fakeConf, fakeServiceStore, eventCh, fakeUpdater, zap.New()) var ctx context.Context @@ -122,6 +125,43 @@ var _ = Describe("EventLoop", func() { }) }) + Describe("Process Service events", func() { + AfterEach(func() { + cancel() + + var err error + Eventually(errorCh).Should(Receive(&err)) + Expect(err).To(BeNil()) + }) + + It("should process upsert event", func() { + svc := &apiv1.Service{} + + eventCh <- &events.UpsertEvent{ + Resource: svc, + } + + Eventually(fakeServiceStore.UpsertCallCount).Should(Equal(1)) + Eventually(func() *apiv1.Service { + return fakeServiceStore.UpsertArgsForCall(0) + }).Should(Equal(svc)) + }) + + It("should process delete event", func() { + nsname := types.NamespacedName{Namespace: "test", Name: "service"} + + eventCh <- &events.DeleteEvent{ + NamespacedName: nsname, + Type: &apiv1.Service{}, + } + + Eventually(fakeServiceStore.DeleteCallCount).Should(Equal(1)) + Eventually(func() types.NamespacedName { + return fakeServiceStore.DeleteArgsForCall(0) + }).Should(Equal(nsname)) + }) + }) + Describe("Edge cases", func() { AfterEach(func() { cancel() diff --git a/internal/implementations/service/service.go b/internal/implementations/service/service.go new file mode 100644 index 0000000000..0bac6140c5 --- /dev/null +++ b/internal/implementations/service/service.go @@ -0,0 +1,51 @@ +package service + +import ( + "github.com/go-logr/logr" + "github.com/nginxinc/nginx-gateway-kubernetes/internal/config" + "github.com/nginxinc/nginx-gateway-kubernetes/internal/events" + "github.com/nginxinc/nginx-gateway-kubernetes/pkg/sdk" + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" +) + +type serviceImplementation struct { + conf config.Config + eventCh chan<- interface{} +} + +// TO-DO: serviceImplementation looks similar to httpRouteImplemenation +// consider if it is possible to reduce the amount of code. + +// NewServiceImplementation creates a new ServiceImplementation. +func NewServiceImplementation(cfg config.Config, eventCh chan<- interface{}) sdk.ServiceImpl { + return &serviceImplementation{ + conf: cfg, + eventCh: eventCh, + } +} + +func (impl *serviceImplementation) Logger() logr.Logger { + return impl.conf.Logger +} + +func (impl *serviceImplementation) Upsert(svc *apiv1.Service) { + impl.Logger().Info("Service was upserted", + "namespace", svc.Namespace, "name", svc.Name, + ) + + impl.eventCh <- &events.UpsertEvent{ + Resource: svc, + } +} + +func (impl *serviceImplementation) Remove(nsname types.NamespacedName) { + impl.Logger().Info("Service resource was removed", + "namespace", nsname.Namespace, "name", nsname.Name, + ) + + impl.eventCh <- &events.DeleteEvent{ + NamespacedName: nsname, + Type: &apiv1.Service{}, + } +} diff --git a/internal/manager/manager.go b/internal/manager/manager.go index a88df40d4d..67b435dc64 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -10,10 +10,12 @@ import ( gc "github.com/nginxinc/nginx-gateway-kubernetes/internal/implementations/gatewayclass" gcfg "github.com/nginxinc/nginx-gateway-kubernetes/internal/implementations/gatewayconfig" hr "github.com/nginxinc/nginx-gateway-kubernetes/internal/implementations/httproute" + svc "github.com/nginxinc/nginx-gateway-kubernetes/internal/implementations/service" "github.com/nginxinc/nginx-gateway-kubernetes/internal/state" "github.com/nginxinc/nginx-gateway-kubernetes/internal/status" nginxgwv1alpha1 "github.com/nginxinc/nginx-gateway-kubernetes/pkg/apis/gateway/v1alpha1" "github.com/nginxinc/nginx-gateway-kubernetes/pkg/sdk" + apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" ctlr "sigs.k8s.io/controller-runtime" @@ -27,8 +29,10 @@ const clusterTimeout = 10 * time.Second var scheme = runtime.NewScheme() func init() { + // TO-DO: handle errors returned by the calls bellow _ = gatewayv1alpha2.AddToScheme(scheme) _ = nginxgwv1alpha1.AddToScheme(scheme) + _ = apiv1.AddToScheme(scheme) } func Start(cfg config.Config) error { @@ -64,10 +68,15 @@ func Start(cfg config.Config) error { if err != nil { return fmt.Errorf("cannot register httproute implementation: %w", err) } + err = sdk.RegisterServiceController(mgr, svc.NewServiceImplementation(cfg, eventCh)) + if err != nil { + return fmt.Errorf("cannot register service implementation: %w", err) + } conf := state.NewConfiguration(cfg.GatewayCtlrName, state.NewRealClock()) + serviceStore := state.NewServiceStore() reporter := status.NewUpdater(mgr.GetClient(), cfg.Logger) - eventLoop := events.NewEventLoop(conf, eventCh, reporter, cfg.Logger) + eventLoop := events.NewEventLoop(conf, serviceStore, eventCh, reporter, cfg.Logger) err = mgr.Add(eventLoop) if err != nil { diff --git a/internal/state/services.go b/internal/state/services.go new file mode 100644 index 0000000000..4b811bd58d --- /dev/null +++ b/internal/state/services.go @@ -0,0 +1,54 @@ +package state + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" +) + +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . ServiceStore + +// ServiceStore stores services and can be queried for the cluster IP of a service. +type ServiceStore interface { + // Upsert upserts the service into the store. + Upsert(svc *v1.Service) + // Delete deletes the service from the store. + Delete(nsname types.NamespacedName) + // Resolve returns the cluster IP the service specified by its namespace and name. + // If the service doesn't have a cluster IP or it doesn't exist, resolve will return an error. + // TO-DO: later, we will start using the Endpoints rather than cluster IPs. + Resolve(nsname types.NamespacedName) (string, error) +} + +// NewServiceStore creates a new ServiceStore. +func NewServiceStore() ServiceStore { + return &serviceStoreImpl{ + services: make(map[string]*v1.Service), + } +} + +type serviceStoreImpl struct { + services map[string]*v1.Service +} + +func (s *serviceStoreImpl) Upsert(svc *v1.Service) { + s.services[getResourceKey(&svc.ObjectMeta)] = svc +} + +func (s *serviceStoreImpl) Delete(nsname types.NamespacedName) { + delete(s.services, nsname.String()) +} + +func (s *serviceStoreImpl) Resolve(nsname types.NamespacedName) (string, error) { + svc, exist := s.services[nsname.String()] + if !exist { + return "", fmt.Errorf("service %s doesn't exist", nsname.String()) + } + + if svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" { + return "", fmt.Errorf("service %s doesn't have ClusterIP", nsname.String()) + } + + return svc.Spec.ClusterIP, nil +} diff --git a/internal/state/services_test.go b/internal/state/services_test.go new file mode 100644 index 0000000000..6e0905c9a4 --- /dev/null +++ b/internal/state/services_test.go @@ -0,0 +1,104 @@ +package state + +import ( + . "github.com/onsi/ginkgo/v2" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + . "github.com/onsi/gomega" +) + +var _ = Describe("ServiceStore", func() { + var store ServiceStore + + BeforeEach(OncePerOrdered, func() { + store = NewServiceStore() + }) + + Describe("Resolve Service", Ordered, func() { + var svc *apiv1.Service + var svcUpdated *apiv1.Service + + BeforeAll(func() { + svc = &apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "service1", + }, + Spec: apiv1.ServiceSpec{ + ClusterIP: "10.0.0.1", + }, + } + + svcUpdated = svc.DeepCopy() + // In reality, ClusterIP cannot change for a regular service. + // However, it is ok to use this change to test the Upsert function. + svcUpdated.Spec.ClusterIP = "10.0.0.2" + }) + + It("should add a service", func() { + store.Upsert(svc) + }) + + It("should resolve the service", func() { + address, err := store.Resolve(types.NamespacedName{Namespace: "test", Name: "service1"}) + + Expect(address).To(Equal("10.0.0.1")) + Expect(err).To(BeNil()) + }) + + It("should update the service", func() { + store.Upsert(svcUpdated) + }) + + It("should resolve the updated service", func() { + address, err := store.Resolve(types.NamespacedName{Namespace: "test", Name: "service1"}) + + Expect(address).To(Equal("10.0.0.2")) + Expect(err).To(BeNil()) + + }) + + It("should delete the service", func() { + store.Delete(types.NamespacedName{Namespace: "test", Name: "service1"}) + }) + + It("should fail to resolve the service", func() { + _, err := store.Resolve(types.NamespacedName{Namespace: "test", Name: "service1"}) + + Expect(err).To(HaveOccurred()) + }) + }) + + Describe("Edge cases", func() { + BeforeEach(func() { + store.Upsert(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "empty-ip", + }, + }) + + store.Upsert(&apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + Name: "none-ip", + }, + Spec: apiv1.ServiceSpec{ + ClusterIP: "None", + }, + }) + }) + DescribeTable("Resolve returns error", + func(nsname types.NamespacedName) { + _, err := store.Resolve(nsname) + + Expect(err).To(HaveOccurred()) + }, + Entry("cluster ip is empty", types.NamespacedName{Namespace: "test", Name: "empty-ip"}), + Entry("cluster ip is none", types.NamespacedName{Namespace: "test", Name: "none-ip"}), + Entry("service doesn't exist", types.NamespacedName{Namespace: "test", Name: "service"}), + ) + }) +}) diff --git a/internal/state/statefakes/fake_service_store.go b/internal/state/statefakes/fake_service_store.go new file mode 100644 index 0000000000..9042fbfcd6 --- /dev/null +++ b/internal/state/statefakes/fake_service_store.go @@ -0,0 +1,196 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package statefakes + +import ( + "sync" + + "github.com/nginxinc/nginx-gateway-kubernetes/internal/state" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" +) + +type FakeServiceStore struct { + DeleteStub func(types.NamespacedName) + deleteMutex sync.RWMutex + deleteArgsForCall []struct { + arg1 types.NamespacedName + } + ResolveStub func(types.NamespacedName) (string, error) + resolveMutex sync.RWMutex + resolveArgsForCall []struct { + arg1 types.NamespacedName + } + resolveReturns struct { + result1 string + result2 error + } + resolveReturnsOnCall map[int]struct { + result1 string + result2 error + } + UpsertStub func(*v1.Service) + upsertMutex sync.RWMutex + upsertArgsForCall []struct { + arg1 *v1.Service + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeServiceStore) Delete(arg1 types.NamespacedName) { + fake.deleteMutex.Lock() + fake.deleteArgsForCall = append(fake.deleteArgsForCall, struct { + arg1 types.NamespacedName + }{arg1}) + stub := fake.DeleteStub + fake.recordInvocation("Delete", []interface{}{arg1}) + fake.deleteMutex.Unlock() + if stub != nil { + fake.DeleteStub(arg1) + } +} + +func (fake *FakeServiceStore) DeleteCallCount() int { + fake.deleteMutex.RLock() + defer fake.deleteMutex.RUnlock() + return len(fake.deleteArgsForCall) +} + +func (fake *FakeServiceStore) DeleteCalls(stub func(types.NamespacedName)) { + fake.deleteMutex.Lock() + defer fake.deleteMutex.Unlock() + fake.DeleteStub = stub +} + +func (fake *FakeServiceStore) DeleteArgsForCall(i int) types.NamespacedName { + fake.deleteMutex.RLock() + defer fake.deleteMutex.RUnlock() + argsForCall := fake.deleteArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeServiceStore) Resolve(arg1 types.NamespacedName) (string, error) { + fake.resolveMutex.Lock() + ret, specificReturn := fake.resolveReturnsOnCall[len(fake.resolveArgsForCall)] + fake.resolveArgsForCall = append(fake.resolveArgsForCall, struct { + arg1 types.NamespacedName + }{arg1}) + stub := fake.ResolveStub + fakeReturns := fake.resolveReturns + fake.recordInvocation("Resolve", []interface{}{arg1}) + fake.resolveMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeServiceStore) ResolveCallCount() int { + fake.resolveMutex.RLock() + defer fake.resolveMutex.RUnlock() + return len(fake.resolveArgsForCall) +} + +func (fake *FakeServiceStore) ResolveCalls(stub func(types.NamespacedName) (string, error)) { + fake.resolveMutex.Lock() + defer fake.resolveMutex.Unlock() + fake.ResolveStub = stub +} + +func (fake *FakeServiceStore) ResolveArgsForCall(i int) types.NamespacedName { + fake.resolveMutex.RLock() + defer fake.resolveMutex.RUnlock() + argsForCall := fake.resolveArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeServiceStore) ResolveReturns(result1 string, result2 error) { + fake.resolveMutex.Lock() + defer fake.resolveMutex.Unlock() + fake.ResolveStub = nil + fake.resolveReturns = struct { + result1 string + result2 error + }{result1, result2} +} + +func (fake *FakeServiceStore) ResolveReturnsOnCall(i int, result1 string, result2 error) { + fake.resolveMutex.Lock() + defer fake.resolveMutex.Unlock() + fake.ResolveStub = nil + if fake.resolveReturnsOnCall == nil { + fake.resolveReturnsOnCall = make(map[int]struct { + result1 string + result2 error + }) + } + fake.resolveReturnsOnCall[i] = struct { + result1 string + result2 error + }{result1, result2} +} + +func (fake *FakeServiceStore) Upsert(arg1 *v1.Service) { + fake.upsertMutex.Lock() + fake.upsertArgsForCall = append(fake.upsertArgsForCall, struct { + arg1 *v1.Service + }{arg1}) + stub := fake.UpsertStub + fake.recordInvocation("Upsert", []interface{}{arg1}) + fake.upsertMutex.Unlock() + if stub != nil { + fake.UpsertStub(arg1) + } +} + +func (fake *FakeServiceStore) UpsertCallCount() int { + fake.upsertMutex.RLock() + defer fake.upsertMutex.RUnlock() + return len(fake.upsertArgsForCall) +} + +func (fake *FakeServiceStore) UpsertCalls(stub func(*v1.Service)) { + fake.upsertMutex.Lock() + defer fake.upsertMutex.Unlock() + fake.UpsertStub = stub +} + +func (fake *FakeServiceStore) UpsertArgsForCall(i int) *v1.Service { + fake.upsertMutex.RLock() + defer fake.upsertMutex.RUnlock() + argsForCall := fake.upsertArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeServiceStore) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.deleteMutex.RLock() + defer fake.deleteMutex.RUnlock() + fake.resolveMutex.RLock() + defer fake.resolveMutex.RUnlock() + fake.upsertMutex.RLock() + defer fake.upsertMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeServiceStore) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ state.ServiceStore = new(FakeServiceStore) diff --git a/pkg/sdk/interfaces.go b/pkg/sdk/interfaces.go index 0dcba68f30..b588778555 100644 --- a/pkg/sdk/interfaces.go +++ b/pkg/sdk/interfaces.go @@ -2,6 +2,7 @@ package sdk import ( nginxgwv1alpha1 "github.com/nginxinc/nginx-gateway-kubernetes/pkg/apis/gateway/v1alpha1" + apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/gateway-api/apis/v1alpha2" ) @@ -26,3 +27,8 @@ type HTTPRouteImpl interface { // TO-DO: change other interfaces to use types.NamespacedName Remove(types.NamespacedName) } + +type ServiceImpl interface { + Upsert(svc *apiv1.Service) + Remove(name types.NamespacedName) +} diff --git a/pkg/sdk/service_controller.go b/pkg/sdk/service_controller.go new file mode 100644 index 0000000000..3d28a25648 --- /dev/null +++ b/pkg/sdk/service_controller.go @@ -0,0 +1,62 @@ +package sdk + +import ( + "context" + + apiv1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + ctlr "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +type serviceReconciler struct { + client.Client + scheme *runtime.Scheme + impl ServiceImpl +} + +// RegisterServiceController registers the ServiceController in the manager. +func RegisterServiceController(mgr manager.Manager, impl ServiceImpl) error { + r := &serviceReconciler{ + Client: mgr.GetClient(), + scheme: mgr.GetScheme(), + impl: impl, + } + + return ctlr.NewControllerManagedBy(mgr). + For(&apiv1.Service{}). + Complete(r) +} + +func (r *serviceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + log := log.FromContext(ctx).WithValues("service", req.NamespacedName) + + log.V(3).Info("Reconciling Service") + + found := true + var svc apiv1.Service + err := r.Get(ctx, req.NamespacedName, &svc) + if err != nil { + if !apierrors.IsNotFound(err) { + log.Error(err, "Failed to get Service") + return reconcile.Result{}, err + } + found = false + } + + if !found { + log.V(3).Info("Removing Service") + + r.impl.Remove(req.NamespacedName) + return reconcile.Result{}, nil + } + + log.V(3).Info("Upserting Service") + + r.impl.Upsert(&svc) + return reconcile.Result{}, nil +}