diff --git a/internal/mode/static/leader_runnable.go b/internal/mode/static/leader_runnable.go new file mode 100644 index 0000000000..e4a9fcfcba --- /dev/null +++ b/internal/mode/static/leader_runnable.go @@ -0,0 +1,30 @@ +package static + +import ( + "context" + + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +type leaderOnlyRunnable struct { + electedCh chan struct{} + runnable manager.Runnable +} + +func newLeaderOnlyRunnable(electedCh chan struct{}, runnable manager.Runnable) *leaderOnlyRunnable { + return &leaderOnlyRunnable{ + electedCh: electedCh, + runnable: runnable, + } +} + +func (l *leaderOnlyRunnable) Start(ctx context.Context) error { + select { + case <-ctx.Done(): + return nil + case <-l.electedCh: + break + } + + return l.runnable.Start(ctx) +} diff --git a/internal/mode/static/manager.go b/internal/mode/static/manager.go index 9667283226..6fd7b7d050 100644 --- a/internal/mode/static/manager.go +++ b/internal/mode/static/manager.go @@ -45,6 +45,7 @@ import ( "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/relationship" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/resolver" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/validation" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/telemetry" ) const ( @@ -63,6 +64,8 @@ func init() { utilruntime.Must(apiext.AddToScheme(scheme)) } +// StartManager starts the controller manager. +// nolint: gocyclo func StartManager(cfg config.Config) error { options := manager.Options{ Scheme: scheme, @@ -205,12 +208,15 @@ func StartManager(cfg config.Config) error { leaderElectorLogger := cfg.Logger.WithName("leaderElector") + electedCh := make(chan struct{}) + if cfg.LeaderElection.Enabled { leaderElector, err := newLeaderElectorRunnable(leaderElectorRunnableConfig{ kubeConfig: clusterCfg, recorder: recorder, onStartedLeading: func(ctx context.Context) { leaderElectorLogger.Info("Started leading") + close(electedCh) statusUpdater.Enable(ctx) }, onStoppedLeading: func() { @@ -228,6 +234,19 @@ func StartManager(cfg config.Config) error { if err = mgr.Add(leaderElector); err != nil { return fmt.Errorf("cannot register leader elector: %w", err) } + } else { + close(electedCh) + } + + telemetryJob := newLeaderOnlyRunnable( + electedCh, + telemetry.NewJob( + &telemetry.StdoutExporter{}, + cfg.Logger.WithName("telemetryExporter"), + ), + ) + if err = mgr.Add(telemetryJob); err != nil { + return fmt.Errorf("cannot register telemetry job: %w", err) } cfg.Logger.Info("Starting manager") diff --git a/internal/mode/static/telemetry/job.go b/internal/mode/static/telemetry/job.go new file mode 100644 index 0000000000..cb4f8baf98 --- /dev/null +++ b/internal/mode/static/telemetry/job.go @@ -0,0 +1,52 @@ +package telemetry + +import ( + "context" + "fmt" + "time" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/util/wait" +) + +// Data type (any or not) is yet to be determined +type Exporter interface { + Export(ctx context.Context, data any) error +} + +type StdoutExporter struct{} + +func (s *StdoutExporter) Export(_ context.Context, data any) error { + fmt.Printf("exporting data: %+v\n", data) + return nil +} + +type Data struct{} + +type Job struct { + exporter Exporter + logger logr.Logger +} + +func NewJob(exporter Exporter, logger logr.Logger) *Job { + return &Job{ + exporter: exporter, + logger: logger, + } +} + +func (j *Job) Start(ctx context.Context) error { + j.logger.Info("starting telemetry job") + wait.UntilWithContext(ctx, j.report, 10*time.Second) + j.logger.Info("stopping telemetry job") + return nil +} + +func (j *Job) report(ctx context.Context) { + j.logger.Info("reporting telemetry") + + err := j.exporter.Export(ctx, Data{}) + if err != nil { + j.logger.Error(err, "failed to export telemetry") + } +}