-
Notifications
You must be signed in to change notification settings - Fork 64
Expose metrics endpoint #486
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f1abcdc
16ad276
3298bbf
4180e7a
e42175c
f35715d
a20d09d
3e98689
394f411
6cd08e7
fc7abaa
c1518e7
7941a94
935698a
4b6e864
19a5662
0d92fca
2b5987c
6a0e175
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
/* | ||
Copyright 2017 The Kubernetes Authors. | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
package app | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net" | ||
"net/http" | ||
"strconv" | ||
"time" | ||
|
||
logger "k8s.io/klog/v2" | ||
) | ||
|
||
type ServerOption func(*Server) | ||
|
||
// WithTimeout sets the shutdown timeout for the server. | ||
func WithTimeout(timeout time.Duration) ServerOption { | ||
return func(s *Server) { | ||
s.shutdownTimeout = timeout | ||
} | ||
} | ||
|
||
type Server struct { | ||
httpServer http.Server | ||
listener net.Listener | ||
endpoint string | ||
shutdownTimeout time.Duration | ||
} | ||
|
||
func NewServer(port int, endpoint string, handler http.Handler, options ...ServerOption) (*Server, error) { | ||
addr := "0" | ||
if port != 0 { | ||
addr = ":" + strconv.Itoa(port) | ||
} | ||
|
||
listener, err := newListener(addr) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
mux := http.NewServeMux() | ||
mux.Handle(endpoint, handler) | ||
|
||
s := &Server{ | ||
endpoint: endpoint, | ||
listener: listener, | ||
httpServer: http.Server{Handler: mux}, | ||
shutdownTimeout: 30 * time.Second, // Default value | ||
} | ||
|
||
for _, opt := range options { | ||
opt(s) | ||
} | ||
|
||
return s, nil | ||
} | ||
|
||
func (s *Server) Start() (err error) { | ||
if s.listener == nil { | ||
logger.Infof("Serving endpoint %s is disabled", s.endpoint) | ||
return | ||
} | ||
|
||
defer func() { | ||
if r := recover(); r != nil { | ||
err = fmt.Errorf("serving endpoint %s failed: %v", s.endpoint, r) | ||
} | ||
}() | ||
|
||
logger.Infof("Started serving endpoint %s at %s", s.endpoint, s.listener.Addr()) | ||
if e := s.httpServer.Serve(s.listener); e != http.ErrServerClosed { | ||
return fmt.Errorf("serving endpoint %s failed: %v", s.endpoint, e) | ||
} | ||
return | ||
} | ||
|
||
func (s *Server) Shutdown() error { | ||
if s.listener == nil { | ||
return nil | ||
} | ||
|
||
logger.Infof("Shutting down endpoint %s at %s (gracefully waiting for %s)", s.endpoint, s.listener.Addr(), s.shutdownTimeout) | ||
|
||
shutdownCtx, cancel := context.WithTimeout(context.Background(), s.shutdownTimeout) | ||
defer cancel() | ||
|
||
// Try graceful shutdown | ||
if err := s.httpServer.Shutdown(shutdownCtx); err != nil { | ||
return fmt.Errorf("failed to shutdown server gracefully: %v", err) | ||
} | ||
return nil | ||
} | ||
|
||
// newListener creates a new TCP listener bound to the given address. | ||
func newListener(addr string) (net.Listener, error) { | ||
// Add a case to disable serving altogether | ||
if addr == "0" { | ||
return nil, nil | ||
} | ||
|
||
listener, err := net.Listen("tcp", addr) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to create listener: %v", err) | ||
} | ||
|
||
return listener, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package metrics | ||
|
||
import ( | ||
"net/http" | ||
|
||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/client_golang/prometheus/collectors" | ||
"github.com/prometheus/client_golang/prometheus/promhttp" | ||
) | ||
|
||
// Global Prometheus Registry | ||
var globalPromRegistry = prometheus.NewRegistry() | ||
|
||
// metricsHandler returns a http.Handler that serves the prometheus metrics | ||
func PrometheusHandler() http.Handler { | ||
// Add Go module build info. | ||
globalPromRegistry.MustRegister(collectors.NewBuildInfoCollector()) | ||
globalPromRegistry.MustRegister(collectors.NewGoCollector()) | ||
globalPromRegistry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) | ||
|
||
handlerOpts := promhttp.HandlerOpts{ | ||
ErrorHandling: promhttp.HTTPErrorOnError, | ||
} | ||
|
||
return promhttp.HandlerFor(globalPromRegistry, handlerOpts) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); | |
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
|
@@ -17,36 +17,42 @@ limitations under the License. | |
package app | ||
|
||
import ( | ||
"net/http" | ||
"strings" | ||
"context" | ||
"fmt" | ||
"net/http" | ||
"sync" | ||
|
||
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp" | ||
|
||
"golang.org/x/sync/errgroup" | ||
"k8s.io/client-go/rest" | ||
"k8s.io/client-go/tools/clientcmd" | ||
"k8s.io/utils/pointer" | ||
|
||
"github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/metrics" | ||
"github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options" | ||
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/config" | ||
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejob" | ||
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/health" | ||
|
||
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp" | ||
|
||
"k8s.io/utils/pointer" | ||
|
||
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/config" | ||
) | ||
|
||
|
||
func buildConfig(master, kubeconfig string) (*rest.Config, error) { | ||
if master != "" || kubeconfig != "" { | ||
return clientcmd.BuildConfigFromFlags(master, kubeconfig) | ||
} | ||
return rest.InClusterConfig() | ||
} | ||
|
||
func Run(opt *options.ServerOption) error { | ||
func Run(ctx context.Context, opt *options.ServerOption) error { | ||
restConfig, err := buildConfig(opt.Master, opt.Kubeconfig) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
neverStop := make(chan struct{}) | ||
|
||
restConfig.QPS = 100.0 | ||
restConfig.Burst = 200.0 | ||
|
||
|
@@ -62,29 +68,51 @@ func Run(opt *options.ServerOption) error { | |
AgentConfigs: strings.Split(opt.AgentConfigs, ","), | ||
} | ||
|
||
jobctrl := queuejob.NewJobController(restConfig, mcadConfig, extConfig) | ||
if jobctrl == nil { | ||
return nil | ||
} | ||
jobctrl.Run(neverStop) | ||
|
||
// This call is blocking (unless an error occurs) which equates to <-neverStop | ||
err = listenHealthProbe(opt) | ||
g, gCtx := errgroup.WithContext(ctx) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the creation of the metrics server should be moved before |
||
|
||
// metrics server | ||
metricsServer, err := NewServer(opt.MetricsListenPort, "/metrics", metrics.PrometheusHandler()) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Starts the health probe listener | ||
func listenHealthProbe(opt *options.ServerOption) error { | ||
handler := http.NewServeMux() | ||
handler.Handle("/healthz", &health.Handler{}) | ||
err := http.ListenAndServe(opt.HealthProbeListenAddr, handler) | ||
healthServer, err := NewServer(opt.HealthProbeListenPort, "/healthz", healthHandler()) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
jobctrl := queuejob.NewJobController(restConfig, mcadConfig, extConfig) | ||
if jobctrl == nil { | ||
return fmt.Errorf("failed to create a job controller") | ||
} | ||
|
||
wg := &sync.WaitGroup{} | ||
wg.Add(1) | ||
g.Go(func() error { | ||
defer wg.Done() | ||
jobctrl.Run(gCtx.Done()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dimakis, cc: @astefanutti There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @z103cb thanks, very good catch. That Go routine should block for the group context channel to close. |
||
return nil | ||
}) | ||
|
||
g.Go(metricsServer.Start) | ||
g.Go(healthServer.Start) | ||
|
||
g.Go(func() error { | ||
wg.Wait() | ||
return metricsServer.Shutdown() | ||
}) | ||
|
||
g.Go(func() error { | ||
wg.Wait() | ||
return healthServer.Shutdown() | ||
}) | ||
|
||
return g.Wait() | ||
} | ||
|
||
func healthHandler() http.Handler { | ||
healthHandler := http.NewServeMux() | ||
healthHandler.Handle("/healthz", &health.Handler{}) | ||
return healthHandler | ||
} |
Uh oh!
There was an error while loading. Please reload this page.