Skip to content

Commit 3e98689

Browse files
committed
refactor: use errgroup and new generic-server
1 parent a20d09d commit 3e98689

File tree

1 file changed

+44
-88
lines changed

1 file changed

+44
-88
lines changed

cmd/kar-controllers/app/server.go

Lines changed: 44 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import (
2121
"context"
2222
"fmt"
2323
"net/http"
24-
"time"
2524

25+
"golang.org/x/sync/errgroup"
2626
"k8s.io/client-go/rest"
2727
"k8s.io/client-go/tools/clientcmd"
2828

@@ -40,6 +40,9 @@ import (
4040
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/config"
4141
)
4242

43+
// Global Prometheus Registry
44+
var globalPromRegistry = prometheus.NewRegistry()
45+
4346
func buildConfig(master, kubeconfig string) (*rest.Config, error) {
4447
if master != "" || kubeconfig != "" {
4548
return clientcmd.BuildConfigFromFlags(master, kubeconfig)
@@ -73,23 +76,7 @@ func Run(ctx context.Context, opt *options.ServerOption) error {
7376
return fmt.Errorf("failed to create a job controller")
7477
}
7578

76-
stopCh := make(chan struct{})
77-
// this channel is used to signal that the job controller is done
78-
jobctrlDoneCh := make(chan struct{})
79-
80-
go func() {
81-
defer close(stopCh)
82-
<-ctx.Done()
83-
}()
84-
85-
go func() {
86-
jobctrl.Run(stopCh)
87-
// close the jobctrlDoneCh channel when the job controller is done
88-
close(jobctrlDoneCh)
89-
}()
90-
91-
// wait for the job controller to be done before shutting down the server
92-
<-jobctrlDoneCh
79+
go jobctrl.Run(ctx.Done())
9380

9481
err = startHealthAndMetricsServers(ctx, opt)
9582
if err != nil {
@@ -100,80 +87,49 @@ func Run(ctx context.Context, opt *options.ServerOption) error {
10087
return nil
10188
}
10289

103-
// Starts the health probe listener
104-
func startHealthAndMetricsServers(ctx context.Context, opt *options.ServerOption) error {
105-
106-
// Create a new registry.
107-
reg := prometheus.NewRegistry()
108-
90+
// metricsHandler returns a http.Handler that serves the prometheus metrics
91+
func prometheusHandler() http.Handler {
10992
// Add Go module build info.
110-
reg.MustRegister(collectors.NewBuildInfoCollector())
111-
reg.MustRegister(collectors.NewGoCollector())
112-
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
93+
globalPromRegistry.MustRegister(collectors.NewBuildInfoCollector())
94+
globalPromRegistry.MustRegister(collectors.NewGoCollector())
95+
globalPromRegistry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
11396

114-
metricsHandler := http.NewServeMux()
115-
116-
// Use the HTTPErrorOnError option for the Prometheus handler
11797
handlerOpts := promhttp.HandlerOpts{
11898
ErrorHandling: promhttp.HTTPErrorOnError,
11999
}
120100

121-
metricsHandler.Handle("/metrics", promhttp.HandlerFor(prometheus.DefaultGatherer, handlerOpts))
122-
123-
healthHandler := http.NewServeMux()
124-
healthHandler.Handle("/healthz", &health.Handler{})
125-
126-
metricsServer := &http.Server{
127-
Addr: opt.MetricsListenAddr,
128-
Handler: metricsHandler,
129-
}
130-
131-
healthServer := &http.Server{
132-
Addr: opt.HealthProbeListenAddr,
133-
Handler: healthHandler,
134-
}
135-
136-
// make a channel for errors for each server
137-
metricsServerErrChan := make(chan error)
138-
healthServerErrChan := make(chan error)
139-
140-
// start servers in their own goroutines
141-
go func() {
142-
defer close(metricsServerErrChan)
143-
err := metricsServer.ListenAndServe()
144-
if err != nil && err != http.ErrServerClosed {
145-
metricsServerErrChan <- err
146-
}
147-
}()
148-
149-
go func() {
150-
defer close(healthServerErrChan)
151-
err := healthServer.ListenAndServe()
152-
if err != nil && err != http.ErrServerClosed {
153-
healthServerErrChan <- err
154-
}
155-
}()
156-
157-
// use select to wait for either a shutdown signal or an error
158-
select {
159-
case <-ctx.Done():
160-
// received an OS shutdown signal, shut down servers gracefully
161-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
162-
defer cancel()
163-
164-
errM := metricsServer.Shutdown(ctx)
165-
if errM != nil {
166-
return fmt.Errorf("metrics server shutdown error: %v", errM)
167-
}
168-
errH := healthServer.Shutdown(ctx)
169-
if errH != nil {
170-
return fmt.Errorf("health server shutdown error: %v", errH)
171-
}
172-
case err := <-metricsServerErrChan:
173-
return fmt.Errorf("metrics server error: %v", err)
174-
case err := <-healthServerErrChan:
175-
return fmt.Errorf("health server error: %v", err)
176-
}
177-
178-
return nil
101+
return promhttp.HandlerFor(globalPromRegistry, handlerOpts)
102+
}
103+
104+
func healthHandler() http.Handler {
105+
healthHandler := http.NewServeMux()
106+
healthHandler.Handle("/healthz", &health.Handler{})
107+
return healthHandler
108+
}
109+
110+
// Starts the health probe listener
111+
func startHealthAndMetricsServers(ctx context.Context, opt *options.ServerOption) error {
112+
g, ctx := errgroup.WithContext(ctx)
113+
114+
// metrics server
115+
metricsServer, err := NewServer(opt.MetricsListenPort, "/metrics", prometheusHandler())
116+
if err != nil {
117+
return err
118+
}
119+
120+
healthServer, err := NewServer(opt.HealthProbeListenPort, "/healthz", healthHandler())
121+
if err != nil {
122+
return err
123+
}
124+
125+
g.Go(metricsServer.Start)
126+
g.Go(healthServer.Start)
127+
128+
go func() {
129+
<-ctx.Done()
130+
metricsServer.Shutdown()
131+
healthServer.Shutdown()
132+
}()
133+
134+
return nil
179135
}

0 commit comments

Comments
 (0)