From f1abcdc1235d2bfbc2c7e897f199b4344ce2eff6 Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Wed, 9 Aug 2023 11:57:36 +0100 Subject: [PATCH 01/19] refactor: addition of root ctx to main --- cmd/kar-controllers/main.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/cmd/kar-controllers/main.go b/cmd/kar-controllers/main.go index d66c4607e..9ccad9673 100644 --- a/cmd/kar-controllers/main.go +++ b/cmd/kar-controllers/main.go @@ -35,6 +35,7 @@ import ( "fmt" "os" + "k8s.io/apiserver/pkg/server" "k8s.io/klog/v2" "github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app" @@ -49,8 +50,15 @@ func main() { s.AddFlags(flagSet) flag.Parse() - if err := app.Run(s); err != nil { + ctx := server.SetupSignalContext() + + // Run the server + if err := app.Run(ctx, s); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } + + <-ctx.Done() + fmt.Println("Shutting down gracefully") + } From 16ad2769ebf913707fa5e4eb81d2c862af7d074a Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Wed, 9 Aug 2023 11:58:23 +0100 Subject: [PATCH 02/19] refactor: addition of metrics address --- cmd/kar-controllers/app/options/options.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cmd/kar-controllers/app/options/options.go b/cmd/kar-controllers/app/options/options.go index 7d42288cc..c4bb6bdcf 100644 --- a/cmd/kar-controllers/app/options/options.go +++ b/cmd/kar-controllers/app/options/options.go @@ -40,6 +40,7 @@ type ServerOption struct { QuotaRestURL string HealthProbeListenAddr string DispatchResourceReservationTimeout int64 + MetricsListenAddr string } // NewServerOption creates a new CMServer with a default config. @@ -65,6 +66,8 @@ func (s *ServerOption) AddFlags(fs *flag.FlagSet) { fs.StringVar(&s.QuotaRestURL, "quotaURL", s.QuotaRestURL, "URL for ReST quota management. Default is none.") fs.IntVar(&s.SecurePort, "secure-port", 6443, "The port on which to serve secured, authenticated access for metrics.") fs.StringVar(&s.HealthProbeListenAddr, "healthProbeListenAddr", ":8081", "Listen address for health probes. Defaults to ':8081'") + // using port 8083 for metrics as 8082 is used by `custom-metrics-apiserver` + fs.StringVar(&s.MetricsListenAddr, "metricsListenAddr", ":8083", "Listen address for metrics. Defaults to ':8083'") fs.Int64Var(&s.DispatchResourceReservationTimeout, "dispatchResourceReservationTimeout", s.DispatchResourceReservationTimeout, "Resource reservation timeout for pods to be created once AppWrapper is dispatched, in millisecond. Defaults to '300000', 5 minutes") } From 3298bbf6a192cb3d3933f8233cef889736a5ecfc Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Wed, 9 Aug 2023 11:59:13 +0100 Subject: [PATCH 03/19] refactor: edit of the deployment and service to expose metrics ports --- .../mcad-controller/templates/deployment.yaml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/deployment/mcad-controller/templates/deployment.yaml b/deployment/mcad-controller/templates/deployment.yaml index 44da801fe..0856a0b5f 100644 --- a/deployment/mcad-controller/templates/deployment.yaml +++ b/deployment/mcad-controller/templates/deployment.yaml @@ -11,9 +11,25 @@ spec: - name: http port: 80 targetPort: 8080 + - name: metrics + port: 8083 + targetPort: 8083 selector: app: custom-metrics-apiserver --- +apiVersion: v1 +kind: Service +metadata: + name: metrics + namespace: kube-system +spec: + ports: + - name: metrics + port: 8083 + targetPort: 8083 + selector: + app: metrics +--- #{{ if .Values.configMap.quotaRestUrl }} apiVersion: v1 kind: Service @@ -260,6 +276,8 @@ spec: name: https - containerPort: 8080 name: http + - containerPort: 8083 + name: metrics volumeMounts: - mountPath: /tmp name: temp-vol From 4180e7afefc52a1041307707db1c49ce9c76b120 Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Wed, 9 Aug 2023 12:05:33 +0100 Subject: [PATCH 04/19] refactor: edit of run to start controller and health + metrics concurrently this commit also edits the startHealthServer to also start collecting of default prometheus metrics. channels are used to collect potential errors and allow for graceful shutdown of servers --- cmd/kar-controllers/app/server.go | 118 +++++++++++++++++++++++++----- 1 file changed, 99 insertions(+), 19 deletions(-) diff --git a/cmd/kar-controllers/app/server.go b/cmd/kar-controllers/app/server.go index 4deb4ac14..eae9cfa76 100644 --- a/cmd/kar-controllers/app/server.go +++ b/cmd/kar-controllers/app/server.go @@ -17,19 +17,27 @@ limitations under the License. package app import ( - "net/http" "strings" - - _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + "context" + "fmt" + "net/http" + "time" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "k8s.io/utils/pointer" "github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options" - "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/config" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejob" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/health" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" + + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + + "k8s.io/utils/pointer" + + "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/config" ) func buildConfig(master, kubeconfig string) (*rest.Config, error) { @@ -39,14 +47,12 @@ func buildConfig(master, kubeconfig string) (*rest.Config, error) { return rest.InClusterConfig() } -func Run(opt *options.ServerOption) error { +func Run(ctx context.Context, opt *options.ServerOption) error { restConfig, err := buildConfig(opt.Master, opt.Kubeconfig) if err != nil { return err } - neverStop := make(chan struct{}) - restConfig.QPS = 100.0 restConfig.Burst = 200.0 @@ -64,27 +70,101 @@ func Run(opt *options.ServerOption) error { jobctrl := queuejob.NewJobController(restConfig, mcadConfig, extConfig) if jobctrl == nil { - return nil + return fmt.Errorf("failed to create a job controller") } - jobctrl.Run(neverStop) - // This call is blocking (unless an error occurs) which equates to <-neverStop - err = listenHealthProbe(opt) + stopCh := make(chan struct{}) + + go func() { + defer close(stopCh) + <-ctx.Done() + }() + + go jobctrl.Run(stopCh) + + err = startHealthAndMetricsServers(ctx, opt) if err != nil { return err } + <-ctx.Done() return nil } // Starts the health probe listener -func listenHealthProbe(opt *options.ServerOption) error { - handler := http.NewServeMux() - handler.Handle("/healthz", &health.Handler{}) - err := http.ListenAndServe(opt.HealthProbeListenAddr, handler) - if err != nil { - return err +func startHealthAndMetricsServers(ctx context.Context, opt *options.ServerOption) error { + + // Create a new registry. + reg := prometheus.NewRegistry() + + // Add Go module build info. + reg.MustRegister(collectors.NewBuildInfoCollector()) + reg.MustRegister(collectors.NewGoCollector()) + reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) + + metricsHandler := http.NewServeMux() + + // Use the HTTPErrorOnError option for the Prometheus handler + handlerOpts := promhttp.HandlerOpts{ + ErrorHandling: promhttp.HTTPErrorOnError, } - return nil + metricsHandler.Handle("/metrics", promhttp.HandlerFor(prometheus.DefaultGatherer, handlerOpts)) + + healthHandler := http.NewServeMux() + healthHandler.Handle("/healthz", &health.Handler{}) + + metricsServer := &http.Server{ + Addr: opt.MetricsListenAddr, + Handler: metricsHandler, + } + + healthServer := &http.Server{ + Addr: opt.HealthProbeListenAddr, + Handler: healthHandler, + } + + // make a channel for errors for each server + metricsServerErrChan := make(chan error) + healthServerErrChan := make(chan error) + + // start servers in their own goroutines + go func() { + defer close(metricsServerErrChan) + err := metricsServer.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + metricsServerErrChan <- err + } + }() + + go func() { + defer close(healthServerErrChan) + err := healthServer.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + healthServerErrChan <- err + } + }() + + // use select to wait for either a shutdown signal or an error + select { + case <-ctx.Done(): + // received an OS shutdown signal, shut down servers gracefully + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + errM := metricsServer.Shutdown(ctx) + if errM != nil { + return fmt.Errorf("metrics server shutdown error: %v", errM) + } + errH := healthServer.Shutdown(ctx) + if errH != nil { + return fmt.Errorf("health server shutdown error: %v", errH) + } + case err := <-metricsServerErrChan: + return fmt.Errorf("metrics server error: %v", err) + case err := <-healthServerErrChan: + return fmt.Errorf("health server error: %v", err) + } + + return nil } From e42175c169e06b8c788ff5ca4ea13d6f51dcbee9 Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Wed, 9 Aug 2023 12:22:11 +0100 Subject: [PATCH 05/19] refactor: making sure that the health and metrics servers only start shutdown once jobctrl is shut down --- cmd/kar-controllers/app/server.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/cmd/kar-controllers/app/server.go b/cmd/kar-controllers/app/server.go index eae9cfa76..e6400140b 100644 --- a/cmd/kar-controllers/app/server.go +++ b/cmd/kar-controllers/app/server.go @@ -74,13 +74,22 @@ func Run(ctx context.Context, opt *options.ServerOption) error { } stopCh := make(chan struct{}) + // this channel is used to signal that the job controller is done + jobctrlDoneCh := make(chan struct{}) go func() { defer close(stopCh) <-ctx.Done() }() - go jobctrl.Run(stopCh) + go func() { + jobctrl.Run(stopCh) + // close the jobctrlDoneCh channel when the job controller is done + close(jobctrlDoneCh) + }() + + // wait for the job controller to be done before shutting down the server + <-jobctrlDoneCh err = startHealthAndMetricsServers(ctx, opt) if err != nil { From f35715d33955aad9f1f4dfbfffe6209e57916ddc Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Fri, 11 Aug 2023 14:28:06 +0100 Subject: [PATCH 06/19] refactor: update health and metric port defaults from strings to ints --- cmd/kar-controllers/app/options/options.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/cmd/kar-controllers/app/options/options.go b/cmd/kar-controllers/app/options/options.go index c4bb6bdcf..bfc24c380 100644 --- a/cmd/kar-controllers/app/options/options.go +++ b/cmd/kar-controllers/app/options/options.go @@ -21,6 +21,8 @@ import ( "os" "strconv" "strings" + + klog "k8s.io/klog/v2" ) // ServerOption is the main context object for the controller manager. @@ -38,9 +40,9 @@ type ServerOption struct { HeadOfLineHoldingTime int QuotaEnabled bool // Controller is to evaluate quota per request QuotaRestURL string - HealthProbeListenAddr string + HealthProbeListenPort int DispatchResourceReservationTimeout int64 - MetricsListenAddr string + MetricsListenPort int } // NewServerOption creates a new CMServer with a default config. @@ -65,9 +67,9 @@ func (s *ServerOption) AddFlags(fs *flag.FlagSet) { fs.BoolVar(&s.QuotaEnabled, "quotaEnabled", s.QuotaEnabled, "Enable quota policy evaluation. Default is false.") fs.StringVar(&s.QuotaRestURL, "quotaURL", s.QuotaRestURL, "URL for ReST quota management. Default is none.") fs.IntVar(&s.SecurePort, "secure-port", 6443, "The port on which to serve secured, authenticated access for metrics.") - fs.StringVar(&s.HealthProbeListenAddr, "healthProbeListenAddr", ":8081", "Listen address for health probes. Defaults to ':8081'") + fs.IntVar(&s.HealthProbeListenPort, "healthProbeListenPort", 8081, "Listen port for health probes. Defaults to ':8081'") // using port 8083 for metrics as 8082 is used by `custom-metrics-apiserver` - fs.StringVar(&s.MetricsListenAddr, "metricsListenAddr", ":8083", "Listen address for metrics. Defaults to ':8083'") + fs.IntVar(&s.MetricsListenPort, "metricsListenPort", 8083, "Listen port for metrics. Defaults to ':8083'") fs.Int64Var(&s.DispatchResourceReservationTimeout, "dispatchResourceReservationTimeout", s.DispatchResourceReservationTimeout, "Resource reservation timeout for pods to be created once AppWrapper is dispatched, in millisecond. Defaults to '300000', 5 minutes") } From a20d09d7aec8f07aa93b024593cf920769265d2d Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Fri, 11 Aug 2023 14:35:56 +0100 Subject: [PATCH 07/19] refactor: addition of a generic server to use for health, metrics etc --- cmd/kar-controllers/app/generic-server.go | 107 ++++++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 cmd/kar-controllers/app/generic-server.go diff --git a/cmd/kar-controllers/app/generic-server.go b/cmd/kar-controllers/app/generic-server.go new file mode 100644 index 000000000..d04da8a3e --- /dev/null +++ b/cmd/kar-controllers/app/generic-server.go @@ -0,0 +1,107 @@ +package app + +import ( + "context" + "fmt" + "net" + "net/http" + "strconv" + "time" + + logger "k8s.io/klog/v2" +) + +type ServerOption func(*Server) + +// WithTimeout sets the shutdown timeout for the server. +func WithTimeout(timeout time.Duration) ServerOption { + return func(s *Server) { + s.shutdownTimeout = timeout + } +} + +type Server struct { + httpServer http.Server + listener net.Listener + endpoint string + shutdownTimeout time.Duration +} + +func NewServer(port int, endpoint string, handler http.Handler, options ...ServerOption) (*Server, error) { + addr := "0" + if port != 0 { + addr = ":" + strconv.Itoa(port) + } + + listener, err := newListener(addr) + if err != nil { + return nil, err + } + + mux := http.NewServeMux() + mux.Handle(endpoint, handler) + + s := &Server{ + endpoint: endpoint, + listener: listener, + httpServer: http.Server{Handler: mux}, + shutdownTimeout: 30 * time.Second, // Default value + } + + for _, opt := range options { + opt(s) + } + + return s, nil +} + +func (s *Server) Start() (err error) { + if s.listener == nil { + logger.Infof("Serving endpoint %s is disabled", s.endpoint) + return + } + + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("serving endpoint %s failed: %v", s.endpoint, r) + } + }() + + logger.Infof("Started serving endpoint %s at %s", s.endpoint, s.listener.Addr()) + if e := s.httpServer.Serve(s.listener); e != http.ErrServerClosed { + return fmt.Errorf("serving endpoint %s failed: %v", s.endpoint, e) + } + return +} + +func (s *Server) Shutdown() error { + if s.listener == nil { + return nil + } + + logger.Info("Stopping server") + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Try graceful shutdown + if err := s.httpServer.Shutdown(shutdownCtx); err != nil { + return fmt.Errorf("failed to shutdown server gracefully: %v", err) + } + return s.httpServer.Shutdown(shutdownCtx) +} + +// newListener creates a new TCP listener bound to the given address. +func newListener(addr string) (net.Listener, error) { + // Add a case to disable serving altogether + if addr == "0" { + return nil, nil + } + + listener, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("failed to create listener: %v", err) + } + + return listener, nil +} From 3e986896d11a5c783cc919c1801d3d504594affc Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Fri, 11 Aug 2023 16:44:33 +0100 Subject: [PATCH 08/19] refactor: use errgroup and new generic-server --- cmd/kar-controllers/app/server.go | 132 ++++++++++-------------------- 1 file changed, 44 insertions(+), 88 deletions(-) diff --git a/cmd/kar-controllers/app/server.go b/cmd/kar-controllers/app/server.go index e6400140b..b00754efb 100644 --- a/cmd/kar-controllers/app/server.go +++ b/cmd/kar-controllers/app/server.go @@ -21,8 +21,8 @@ import ( "context" "fmt" "net/http" - "time" + "golang.org/x/sync/errgroup" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -40,6 +40,9 @@ import ( "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/config" ) +// Global Prometheus Registry +var globalPromRegistry = prometheus.NewRegistry() + func buildConfig(master, kubeconfig string) (*rest.Config, error) { if master != "" || kubeconfig != "" { return clientcmd.BuildConfigFromFlags(master, kubeconfig) @@ -73,23 +76,7 @@ func Run(ctx context.Context, opt *options.ServerOption) error { return fmt.Errorf("failed to create a job controller") } - stopCh := make(chan struct{}) - // this channel is used to signal that the job controller is done - jobctrlDoneCh := make(chan struct{}) - - go func() { - defer close(stopCh) - <-ctx.Done() - }() - - go func() { - jobctrl.Run(stopCh) - // close the jobctrlDoneCh channel when the job controller is done - close(jobctrlDoneCh) - }() - - // wait for the job controller to be done before shutting down the server - <-jobctrlDoneCh + go jobctrl.Run(ctx.Done()) err = startHealthAndMetricsServers(ctx, opt) if err != nil { @@ -100,80 +87,49 @@ func Run(ctx context.Context, opt *options.ServerOption) error { return nil } -// Starts the health probe listener -func startHealthAndMetricsServers(ctx context.Context, opt *options.ServerOption) error { - - // Create a new registry. - reg := prometheus.NewRegistry() - +// metricsHandler returns a http.Handler that serves the prometheus metrics +func prometheusHandler() http.Handler { // Add Go module build info. - reg.MustRegister(collectors.NewBuildInfoCollector()) - reg.MustRegister(collectors.NewGoCollector()) - reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) + globalPromRegistry.MustRegister(collectors.NewBuildInfoCollector()) + globalPromRegistry.MustRegister(collectors.NewGoCollector()) + globalPromRegistry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) - metricsHandler := http.NewServeMux() - - // Use the HTTPErrorOnError option for the Prometheus handler handlerOpts := promhttp.HandlerOpts{ ErrorHandling: promhttp.HTTPErrorOnError, } - metricsHandler.Handle("/metrics", promhttp.HandlerFor(prometheus.DefaultGatherer, handlerOpts)) - - healthHandler := http.NewServeMux() - healthHandler.Handle("/healthz", &health.Handler{}) - - metricsServer := &http.Server{ - Addr: opt.MetricsListenAddr, - Handler: metricsHandler, - } - - healthServer := &http.Server{ - Addr: opt.HealthProbeListenAddr, - Handler: healthHandler, - } - - // make a channel for errors for each server - metricsServerErrChan := make(chan error) - healthServerErrChan := make(chan error) - - // start servers in their own goroutines - go func() { - defer close(metricsServerErrChan) - err := metricsServer.ListenAndServe() - if err != nil && err != http.ErrServerClosed { - metricsServerErrChan <- err - } - }() - - go func() { - defer close(healthServerErrChan) - err := healthServer.ListenAndServe() - if err != nil && err != http.ErrServerClosed { - healthServerErrChan <- err - } - }() - - // use select to wait for either a shutdown signal or an error - select { - case <-ctx.Done(): - // received an OS shutdown signal, shut down servers gracefully - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - errM := metricsServer.Shutdown(ctx) - if errM != nil { - return fmt.Errorf("metrics server shutdown error: %v", errM) - } - errH := healthServer.Shutdown(ctx) - if errH != nil { - return fmt.Errorf("health server shutdown error: %v", errH) - } - case err := <-metricsServerErrChan: - return fmt.Errorf("metrics server error: %v", err) - case err := <-healthServerErrChan: - return fmt.Errorf("health server error: %v", err) - } - - return nil + return promhttp.HandlerFor(globalPromRegistry, handlerOpts) +} + +func healthHandler() http.Handler { + healthHandler := http.NewServeMux() + healthHandler.Handle("/healthz", &health.Handler{}) + return healthHandler +} + +// Starts the health probe listener +func startHealthAndMetricsServers(ctx context.Context, opt *options.ServerOption) error { + g, ctx := errgroup.WithContext(ctx) + + // metrics server + metricsServer, err := NewServer(opt.MetricsListenPort, "/metrics", prometheusHandler()) + if err != nil { + return err + } + + healthServer, err := NewServer(opt.HealthProbeListenPort, "/healthz", healthHandler()) + if err != nil { + return err + } + + g.Go(metricsServer.Start) + g.Go(healthServer.Start) + + go func() { + <-ctx.Done() + metricsServer.Shutdown() + healthServer.Shutdown() + }() + + return nil } From 394f411b819f4b0e0d0fee99f48092463381ee32 Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Mon, 14 Aug 2023 10:14:42 +0100 Subject: [PATCH 09/19] refactor: update metrics port in deployment to standard prom port --- deployment/mcad-controller/templates/deployment.yaml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/deployment/mcad-controller/templates/deployment.yaml b/deployment/mcad-controller/templates/deployment.yaml index 0856a0b5f..dfbd3fed6 100644 --- a/deployment/mcad-controller/templates/deployment.yaml +++ b/deployment/mcad-controller/templates/deployment.yaml @@ -12,8 +12,8 @@ spec: port: 80 targetPort: 8080 - name: metrics - port: 8083 - targetPort: 8083 + port: 9090 + targetPort: 9090 selector: app: custom-metrics-apiserver --- @@ -25,8 +25,8 @@ metadata: spec: ports: - name: metrics - port: 8083 - targetPort: 8083 + port: 9090 + targetPort: 9090 selector: app: metrics --- @@ -276,7 +276,7 @@ spec: name: https - containerPort: 8080 name: http - - containerPort: 8083 + - containerPort: 9090 name: metrics volumeMounts: - mountPath: /tmp From 6cd08e7e547a4f7be8e03a60b30ef4175cf49386 Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Mon, 14 Aug 2023 10:19:05 +0100 Subject: [PATCH 10/19] docs: add license --- cmd/kar-controllers/app/generic-server.go | 30 +++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/cmd/kar-controllers/app/generic-server.go b/cmd/kar-controllers/app/generic-server.go index d04da8a3e..a3fa5a492 100644 --- a/cmd/kar-controllers/app/generic-server.go +++ b/cmd/kar-controllers/app/generic-server.go @@ -1,3 +1,33 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +/* +Copyright 2019, 2021 The Multi-Cluster App Dispatcher Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ package app import ( From fc7abaa088f9c10c4d608a6c664a6e8cfdfb6f76 Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Mon, 14 Aug 2023 10:19:43 +0100 Subject: [PATCH 11/19] refactor: update metrics port to use conventional prom port --- cmd/kar-controllers/app/options/options.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/cmd/kar-controllers/app/options/options.go b/cmd/kar-controllers/app/options/options.go index bfc24c380..d4eef099a 100644 --- a/cmd/kar-controllers/app/options/options.go +++ b/cmd/kar-controllers/app/options/options.go @@ -21,8 +21,6 @@ import ( "os" "strconv" "strings" - - klog "k8s.io/klog/v2" ) // ServerOption is the main context object for the controller manager. @@ -40,9 +38,9 @@ type ServerOption struct { HeadOfLineHoldingTime int QuotaEnabled bool // Controller is to evaluate quota per request QuotaRestURL string - HealthProbeListenPort int + HealthProbeListenPort int DispatchResourceReservationTimeout int64 - MetricsListenPort int + MetricsListenPort int } // NewServerOption creates a new CMServer with a default config. @@ -68,8 +66,8 @@ func (s *ServerOption) AddFlags(fs *flag.FlagSet) { fs.StringVar(&s.QuotaRestURL, "quotaURL", s.QuotaRestURL, "URL for ReST quota management. Default is none.") fs.IntVar(&s.SecurePort, "secure-port", 6443, "The port on which to serve secured, authenticated access for metrics.") fs.IntVar(&s.HealthProbeListenPort, "healthProbeListenPort", 8081, "Listen port for health probes. Defaults to ':8081'") - // using port 8083 for metrics as 8082 is used by `custom-metrics-apiserver` - fs.IntVar(&s.MetricsListenPort, "metricsListenPort", 8083, "Listen port for metrics. Defaults to ':8083'") + // using port 9090 for metrics as 8082 is used by `custom-metrics-apiserver` + fs.IntVar(&s.MetricsListenPort, "metricsListenPort", 9090, "Listen port for metrics. Defaults to ':9090'") fs.Int64Var(&s.DispatchResourceReservationTimeout, "dispatchResourceReservationTimeout", s.DispatchResourceReservationTimeout, "Resource reservation timeout for pods to be created once AppWrapper is dispatched, in millisecond. Defaults to '300000', 5 minutes") } From c1518e710263460bb3e7504c8e618b14c570c024 Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Mon, 14 Aug 2023 10:25:46 +0100 Subject: [PATCH 12/19] refactor: move metrics funcs to own file --- .../app/metrics/prom-metrics.go | 26 +++++++++++++++++++ cmd/kar-controllers/app/server.go | 22 ++-------------- 2 files changed, 28 insertions(+), 20 deletions(-) create mode 100644 cmd/kar-controllers/app/metrics/prom-metrics.go diff --git a/cmd/kar-controllers/app/metrics/prom-metrics.go b/cmd/kar-controllers/app/metrics/prom-metrics.go new file mode 100644 index 000000000..273718e1f --- /dev/null +++ b/cmd/kar-controllers/app/metrics/prom-metrics.go @@ -0,0 +1,26 @@ +package metrics + +import ( + "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +// Global Prometheus Registry +var globalPromRegistry = prometheus.NewRegistry() + +// metricsHandler returns a http.Handler that serves the prometheus metrics +func PrometheusHandler() http.Handler { + // Add Go module build info. + globalPromRegistry.MustRegister(collectors.NewBuildInfoCollector()) + globalPromRegistry.MustRegister(collectors.NewGoCollector()) + globalPromRegistry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) + + handlerOpts := promhttp.HandlerOpts{ + ErrorHandling: promhttp.HTTPErrorOnError, + } + + return promhttp.HandlerFor(globalPromRegistry, handlerOpts) +} \ No newline at end of file diff --git a/cmd/kar-controllers/app/server.go b/cmd/kar-controllers/app/server.go index b00754efb..e9bcd9121 100644 --- a/cmd/kar-controllers/app/server.go +++ b/cmd/kar-controllers/app/server.go @@ -26,12 +26,10 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/metrics" "github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejob" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/health" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/collectors" - "github.com/prometheus/client_golang/prometheus/promhttp" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" @@ -40,8 +38,6 @@ import ( "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/config" ) -// Global Prometheus Registry -var globalPromRegistry = prometheus.NewRegistry() func buildConfig(master, kubeconfig string) (*rest.Config, error) { if master != "" || kubeconfig != "" { @@ -87,20 +83,6 @@ func Run(ctx context.Context, opt *options.ServerOption) error { return nil } -// metricsHandler returns a http.Handler that serves the prometheus metrics -func prometheusHandler() http.Handler { - // Add Go module build info. - globalPromRegistry.MustRegister(collectors.NewBuildInfoCollector()) - globalPromRegistry.MustRegister(collectors.NewGoCollector()) - globalPromRegistry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) - - handlerOpts := promhttp.HandlerOpts{ - ErrorHandling: promhttp.HTTPErrorOnError, - } - - return promhttp.HandlerFor(globalPromRegistry, handlerOpts) -} - func healthHandler() http.Handler { healthHandler := http.NewServeMux() healthHandler.Handle("/healthz", &health.Handler{}) @@ -112,7 +94,7 @@ func startHealthAndMetricsServers(ctx context.Context, opt *options.ServerOption g, ctx := errgroup.WithContext(ctx) // metrics server - metricsServer, err := NewServer(opt.MetricsListenPort, "/metrics", prometheusHandler()) + metricsServer, err := NewServer(opt.MetricsListenPort, "/metrics", metrics.PrometheusHandler()) if err != nil { return err } From 7941a94d84a2b4c1475f5a6ee7e6cfddec13a682 Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Tue, 22 Aug 2023 12:36:27 +0100 Subject: [PATCH 13/19] refactor: add better error and use custom timeout --- cmd/kar-controllers/app/generic-server.go | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/cmd/kar-controllers/app/generic-server.go b/cmd/kar-controllers/app/generic-server.go index a3fa5a492..53dd88bac 100644 --- a/cmd/kar-controllers/app/generic-server.go +++ b/cmd/kar-controllers/app/generic-server.go @@ -1,21 +1,6 @@ /* Copyright 2017 The Kubernetes Authors. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -/* -Copyright 2019, 2021 The Multi-Cluster App Dispatcher Authors. - Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -109,9 +94,9 @@ func (s *Server) Shutdown() error { return nil } - logger.Info("Stopping server") + logger.Infof("Shutting down endpoint %s at %s (gracefully waiting for %s)", s.endpoint, s.listener.Addr(), s.shutdownTimeout) - shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + shutdownCtx, cancel := context.WithTimeout(context.Background(), s.shutdownTimeout) defer cancel() // Try graceful shutdown From 935698a5879d6674500d7633272874690abe9f68 Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Tue, 22 Aug 2023 12:37:10 +0100 Subject: [PATCH 14/19] refactor: remove not needed comments and context --- cmd/kar-controllers/app/options/options.go | 1 - cmd/kar-controllers/main.go | 1 - 2 files changed, 2 deletions(-) diff --git a/cmd/kar-controllers/app/options/options.go b/cmd/kar-controllers/app/options/options.go index d4eef099a..ea6a84e5d 100644 --- a/cmd/kar-controllers/app/options/options.go +++ b/cmd/kar-controllers/app/options/options.go @@ -66,7 +66,6 @@ func (s *ServerOption) AddFlags(fs *flag.FlagSet) { fs.StringVar(&s.QuotaRestURL, "quotaURL", s.QuotaRestURL, "URL for ReST quota management. Default is none.") fs.IntVar(&s.SecurePort, "secure-port", 6443, "The port on which to serve secured, authenticated access for metrics.") fs.IntVar(&s.HealthProbeListenPort, "healthProbeListenPort", 8081, "Listen port for health probes. Defaults to ':8081'") - // using port 9090 for metrics as 8082 is used by `custom-metrics-apiserver` fs.IntVar(&s.MetricsListenPort, "metricsListenPort", 9090, "Listen port for metrics. Defaults to ':9090'") fs.Int64Var(&s.DispatchResourceReservationTimeout, "dispatchResourceReservationTimeout", s.DispatchResourceReservationTimeout, "Resource reservation timeout for pods to be created once AppWrapper is dispatched, in millisecond. Defaults to '300000', 5 minutes") } diff --git a/cmd/kar-controllers/main.go b/cmd/kar-controllers/main.go index 9ccad9673..bb4f0c6a7 100644 --- a/cmd/kar-controllers/main.go +++ b/cmd/kar-controllers/main.go @@ -58,7 +58,6 @@ func main() { os.Exit(1) } - <-ctx.Done() fmt.Println("Shutting down gracefully") } From 4b6e8645cf371fe7bb00f203a4e0db62af3c6656 Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Tue, 22 Aug 2023 12:39:26 +0100 Subject: [PATCH 15/19] refactor: start shutdown in their own goroutines and start metrics server before controller --- cmd/kar-controllers/app/server.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/cmd/kar-controllers/app/server.go b/cmd/kar-controllers/app/server.go index e9bcd9121..4d8567326 100644 --- a/cmd/kar-controllers/app/server.go +++ b/cmd/kar-controllers/app/server.go @@ -72,14 +72,16 @@ func Run(ctx context.Context, opt *options.ServerOption) error { return fmt.Errorf("failed to create a job controller") } - go jobctrl.Run(ctx.Done()) + // Start the health and metrics servers err = startHealthAndMetricsServers(ctx, opt) if err != nil { return err } - <-ctx.Done() + // Start the job controller + go jobctrl.Run(ctx.Done()) + return nil } @@ -107,11 +109,16 @@ func startHealthAndMetricsServers(ctx context.Context, opt *options.ServerOption g.Go(metricsServer.Start) g.Go(healthServer.Start) - go func() { + g.Go(func() error { <-ctx.Done() - metricsServer.Shutdown() - healthServer.Shutdown() - }() + return metricsServer.Shutdown() + }) - return nil + g.Go(func() error { + <-ctx.Done() + return healthServer.Shutdown() + }) + + return g.Wait() } + From 19a56620d739e12d3acd02a3696d55aa17d62509 Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Tue, 22 Aug 2023 12:43:07 +0100 Subject: [PATCH 16/19] docs: remove duplicate copyright notice --- cmd/kar-controllers/main.go | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/cmd/kar-controllers/main.go b/cmd/kar-controllers/main.go index bb4f0c6a7..68f6212ca 100644 --- a/cmd/kar-controllers/main.go +++ b/cmd/kar-controllers/main.go @@ -1,19 +1,4 @@ /* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -/* Copyright 2019, 2021 The Multi-Cluster App Dispatcher Authors. Licensed under the Apache License, Version 2.0 (the "License"); From 0d92fca321b30feca0cef2826060e5b106adbf9e Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Wed, 23 Aug 2023 11:14:21 +0100 Subject: [PATCH 17/19] refactor: addition of wait groups to ensure order of shutdown and other minor edits from reviews --- cmd/kar-controllers/app/generic-server.go | 2 +- cmd/kar-controllers/app/server.go | 21 ++++++++++++++++++--- cmd/kar-controllers/main.go | 2 -- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/cmd/kar-controllers/app/generic-server.go b/cmd/kar-controllers/app/generic-server.go index 53dd88bac..fea8bce5a 100644 --- a/cmd/kar-controllers/app/generic-server.go +++ b/cmd/kar-controllers/app/generic-server.go @@ -103,7 +103,7 @@ func (s *Server) Shutdown() error { if err := s.httpServer.Shutdown(shutdownCtx); err != nil { return fmt.Errorf("failed to shutdown server gracefully: %v", err) } - return s.httpServer.Shutdown(shutdownCtx) + return nil } // newListener creates a new TCP listener bound to the given address. diff --git a/cmd/kar-controllers/app/server.go b/cmd/kar-controllers/app/server.go index 4d8567326..e74a2cf79 100644 --- a/cmd/kar-controllers/app/server.go +++ b/cmd/kar-controllers/app/server.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -21,6 +21,7 @@ import ( "context" "fmt" "net/http" + "sync" "golang.org/x/sync/errgroup" "k8s.io/client-go/rest" @@ -79,8 +80,22 @@ func Run(ctx context.Context, opt *options.ServerOption) error { return err } - // Start the job controller - go jobctrl.Run(ctx.Done()) + // Create the job controller + jobctrl := queuejob.NewJobController(config, opt) + if jobctrl == nil { + return fmt.Errorf("failed to create a job controller") + } + + // Run the job controller in a goroutine and wait for it to exit + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + jobctrl.Run(ctx.Done()) + }() + + wg.Wait() + return nil } diff --git a/cmd/kar-controllers/main.go b/cmd/kar-controllers/main.go index 68f6212ca..d83f44ecd 100644 --- a/cmd/kar-controllers/main.go +++ b/cmd/kar-controllers/main.go @@ -43,6 +43,4 @@ func main() { os.Exit(1) } - fmt.Println("Shutting down gracefully") - } From 2b5987c617a945732d272377e146bdc963263139 Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Wed, 23 Aug 2023 12:56:12 +0100 Subject: [PATCH 18/19] refactor: simplify the concurrency logic --- cmd/kar-controllers/app/server.go | 58 +++++++++++-------------------- 1 file changed, 21 insertions(+), 37 deletions(-) diff --git a/cmd/kar-controllers/app/server.go b/cmd/kar-controllers/app/server.go index e74a2cf79..a53b9ecec 100644 --- a/cmd/kar-controllers/app/server.go +++ b/cmd/kar-controllers/app/server.go @@ -73,9 +73,15 @@ func Run(ctx context.Context, opt *options.ServerOption) error { return fmt.Errorf("failed to create a job controller") } + g, gCtx := errgroup.WithContext(ctx) - // Start the health and metrics servers - err = startHealthAndMetricsServers(ctx, opt) + // metrics server + metricsServer, err := NewServer(opt.MetricsListenPort, "/metrics", metrics.PrometheusHandler()) + if err != nil { + return err + } + + healthServer, err := NewServer(opt.HealthProbeListenPort, "/healthz", healthHandler()) if err != nil { return err } @@ -86,54 +92,32 @@ func Run(ctx context.Context, opt *options.ServerOption) error { return fmt.Errorf("failed to create a job controller") } - // Run the job controller in a goroutine and wait for it to exit - wg := sync.WaitGroup{} + wg := &sync.WaitGroup{} wg.Add(1) - go func() { + g.Go(func() error { defer wg.Done() - jobctrl.Run(ctx.Done()) - }() - - wg.Wait() - - - return nil -} - -func healthHandler() http.Handler { - healthHandler := http.NewServeMux() - healthHandler.Handle("/healthz", &health.Handler{}) - return healthHandler -} - -// Starts the health probe listener -func startHealthAndMetricsServers(ctx context.Context, opt *options.ServerOption) error { - g, ctx := errgroup.WithContext(ctx) - - // metrics server - metricsServer, err := NewServer(opt.MetricsListenPort, "/metrics", metrics.PrometheusHandler()) - if err != nil { - return err - } - - healthServer, err := NewServer(opt.HealthProbeListenPort, "/healthz", healthHandler()) - if err != nil { - return err - } + jobctrl.Run(gCtx.Done()) + return nil + }) g.Go(metricsServer.Start) g.Go(healthServer.Start) g.Go(func() error { - <-ctx.Done() + wg.Wait() return metricsServer.Shutdown() }) g.Go(func() error { - <-ctx.Done() + wg.Wait() return healthServer.Shutdown() }) - return g.Wait() + return g.Wait() } +func healthHandler() http.Handler { + healthHandler := http.NewServeMux() + healthHandler.Handle("/healthz", &health.Handler{}) + return healthHandler +} From 6a0e17547c42c4e41624cdb5a66e8e7619f8e659 Mon Sep 17 00:00:00 2001 From: Dimitri Saridakis Date: Fri, 15 Sep 2023 12:13:20 +0100 Subject: [PATCH 19/19] refactor: update after rebase --- cmd/kar-controllers/app/server.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/cmd/kar-controllers/app/server.go b/cmd/kar-controllers/app/server.go index a53b9ecec..6ed71eac4 100644 --- a/cmd/kar-controllers/app/server.go +++ b/cmd/kar-controllers/app/server.go @@ -68,10 +68,6 @@ func Run(ctx context.Context, opt *options.ServerOption) error { AgentConfigs: strings.Split(opt.AgentConfigs, ","), } - jobctrl := queuejob.NewJobController(restConfig, mcadConfig, extConfig) - if jobctrl == nil { - return fmt.Errorf("failed to create a job controller") - } g, gCtx := errgroup.WithContext(ctx) @@ -86,8 +82,7 @@ func Run(ctx context.Context, opt *options.ServerOption) error { return err } - // Create the job controller - jobctrl := queuejob.NewJobController(config, opt) + jobctrl := queuejob.NewJobController(restConfig, mcadConfig, extConfig) if jobctrl == nil { return fmt.Errorf("failed to create a job controller") }