From c2a49b818a2796430f4a277225f914617dfd4114 Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Fri, 5 Jan 2024 13:30:51 -0500 Subject: [PATCH 01/10] Add telemetry job Problem: We want to have a telemetry job that periodically reports product telemetry every 24h. For now, telemetry data is empty and report is sent to the debug log. Solution: - Refactor leader election to use controller-runtime manager capabilities. This simplifies the existing code and make it easier to add a telemetry Job. - Add a telemetry Job that periodically reports empty telemetry to the debug log. - Make the period configurable at build time via TELEMETRY_REPORT_PERIOD Makefile variable. Note: leader elector refactoring changes behavior of NGF process when leadership gets lost: Before: the Manager would shutdown waiting for the runnables to exit. After: the Manager doesn't wait. It similar to NGF process panicing. This should be OK, as NGF container will restart and recover any potentially broken state (update not fully populated statuses, restore correct NGINX configuration). Testing: - Unit tests - Manual testing: - Ensure leader election works as expected - both leader and non-pods run successfully. - Ensure NGF container exits when stop being leader. - Ensure an upgrade from Release 1.1.0 is successful for leader election - the leader gets elected among the new pods. - Ensure the telemetry Job reports telemetry multiple times, using a small value of ELEMETRY_REPORT_PERIOD CLOSES https://github.com/nginxinc/nginx-gateway-fabric/issues/1382 --- Makefile | 3 +- cmd/gateway/commands.go | 8 +- cmd/gateway/main.go | 5 +- internal/framework/runnables/doc.go | 5 + internal/framework/runnables/runnables.go | 62 ++++++++++ .../framework/runnables/runnables_test.go | 37 ++++++ internal/mode/static/config/config.go | 4 + internal/mode/static/leader.go | 102 ---------------- internal/mode/static/manager.go | 52 ++++---- internal/mode/static/telemetry/doc.go | 4 + internal/mode/static/telemetry/exporter.go | 38 ++++++ .../mode/static/telemetry/exporter_test.go | 24 ++++ internal/mode/static/telemetry/job.go | 65 ++++++++++ internal/mode/static/telemetry/job_test.go | 68 +++++++++++ .../static/telemetry/telemetry_suite_test.go | 13 ++ .../telemetry/telemetryfakes/fake_exporter.go | 114 ++++++++++++++++++ 16 files changed, 476 insertions(+), 128 deletions(-) create mode 100644 internal/framework/runnables/doc.go create mode 100644 internal/framework/runnables/runnables.go create mode 100644 internal/framework/runnables/runnables_test.go delete mode 100644 internal/mode/static/leader.go create mode 100644 internal/mode/static/telemetry/doc.go create mode 100644 internal/mode/static/telemetry/exporter.go create mode 100644 internal/mode/static/telemetry/exporter_test.go create mode 100644 internal/mode/static/telemetry/job.go create mode 100644 internal/mode/static/telemetry/job_test.go create mode 100644 internal/mode/static/telemetry/telemetry_suite_test.go create mode 100644 internal/mode/static/telemetry/telemetryfakes/fake_exporter.go diff --git a/Makefile b/Makefile index 6caa2af686..0105bd4062 100644 --- a/Makefile +++ b/Makefile @@ -8,11 +8,12 @@ NGINX_CONF_DIR = internal/mode/static/nginx/conf NJS_DIR = internal/mode/static/nginx/modules/src NGINX_DOCKER_BUILD_PLUS_ARGS = --secret id=nginx-repo.crt,src=nginx-repo.crt --secret id=nginx-repo.key,src=nginx-repo.key BUILD_AGENT=local +TELEMETRY_REPORT_PERIOD = 24h GW_API_VERSION = 1.0.0 INSTALL_WEBHOOK = false # go build flags - should not be overridden by the user -GO_LINKER_FlAGS_VARS = -X main.version=${VERSION} -X main.commit=${GIT_COMMIT} -X main.date=${DATE} +GO_LINKER_FlAGS_VARS = -X main.version=${VERSION} -X main.commit=${GIT_COMMIT} -X main.date=${DATE} -X main.telemetryReportPeriod=${TELEMETRY_REPORT_PERIOD} GO_LINKER_FLAGS_OPTIMIZATIONS = -s -w GO_LINKER_FLAGS = $(GO_LINKER_FLAGS_OPTIMIZATIONS) $(GO_LINKER_FlAGS_VARS) diff --git a/cmd/gateway/commands.go b/cmd/gateway/commands.go index 6204940a54..d4eaa3ccc5 100644 --- a/cmd/gateway/commands.go +++ b/cmd/gateway/commands.go @@ -131,6 +131,11 @@ func createStaticModeCommand() *cobra.Command { return errors.New("POD_NAME environment variable must be set") } + period, err := time.ParseDuration(telemetryReportPeriod) + if err != nil { + return fmt.Errorf("error parsing telemetry report period: %w", err) + } + var gwNsName *types.NamespacedName if cmd.Flags().Changed(gatewayFlag) { gwNsName = &gateway.value @@ -163,7 +168,8 @@ func createStaticModeCommand() *cobra.Command { LockName: leaderElectionLockName.String(), Identity: podName, }, - Plus: plus, + Plus: plus, + TelemetryReportPeriod: period, } if err := static.StartManager(conf); err != nil { diff --git a/cmd/gateway/main.go b/cmd/gateway/main.go index 79509340c9..31ffdf61d5 100644 --- a/cmd/gateway/main.go +++ b/cmd/gateway/main.go @@ -5,11 +5,14 @@ import ( "os" ) +// Set during go build var ( - // Set during go build version string commit string date string + + // telemetryReportPeriod is the period at which telemetry reports are sent. + telemetryReportPeriod string ) func main() { diff --git a/internal/framework/runnables/doc.go b/internal/framework/runnables/doc.go new file mode 100644 index 0000000000..99a52fd2ac --- /dev/null +++ b/internal/framework/runnables/doc.go @@ -0,0 +1,5 @@ +/* +Package runnables provides helper types for creating runnables for the controller-runtime manager when +leader election is enabled. +*/ +package runnables diff --git a/internal/framework/runnables/runnables.go b/internal/framework/runnables/runnables.go new file mode 100644 index 0000000000..d960475008 --- /dev/null +++ b/internal/framework/runnables/runnables.go @@ -0,0 +1,62 @@ +package runnables + +import ( + "context" + + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +// Leader is a Runnable that needs to be run only when the current instance is the leader. +type Leader struct { + manager.Runnable +} + +var ( + _ manager.LeaderElectionRunnable = &Leader{} + _ manager.Runnable = &Leader{} +) + +func (r *Leader) NeedLeaderElection() bool { + return true +} + +// LeaderOrNonLeader is a Runnable that needs to be run regardless of whether the current instance is the leader. +type LeaderOrNonLeader struct { + manager.Runnable +} + +var ( + _ manager.LeaderElectionRunnable = &LeaderOrNonLeader{} + _ manager.Runnable = &LeaderOrNonLeader{} +) + +func (r *LeaderOrNonLeader) NeedLeaderElection() bool { + return false +} + +// EnableAfterBecameLeader is a Runnable that will call the enable function when the current instance becomes +// the leader. +type EnableAfterBecameLeader struct { + enable func(context.Context) +} + +var ( + _ manager.LeaderElectionRunnable = &EnableAfterBecameLeader{} + _ manager.Runnable = &EnableAfterBecameLeader{} +) + +// NewEnableAfterBecameLeader creates a new EnableAfterBecameLeader Runnable. +func NewEnableAfterBecameLeader(enable func(context.Context)) *EnableAfterBecameLeader { + return &EnableAfterBecameLeader{ + enable: enable, + } +} + +func (j *EnableAfterBecameLeader) Start(ctx context.Context) error { + j.enable(ctx) + return nil +} + +func (j *EnableAfterBecameLeader) NeedLeaderElection() bool { + return true +} diff --git a/internal/framework/runnables/runnables_test.go b/internal/framework/runnables/runnables_test.go new file mode 100644 index 0000000000..faf84a90af --- /dev/null +++ b/internal/framework/runnables/runnables_test.go @@ -0,0 +1,37 @@ +package runnables + +import ( + "context" + "testing" + + . "github.com/onsi/gomega" +) + +func TestLeader(t *testing.T) { + leader := &Leader{} + + g := NewWithT(t) + g.Expect(leader.NeedLeaderElection()).To(BeTrue()) +} + +func TestLeaderOrNonLeader(t *testing.T) { + leaderOrNonLeader := &LeaderOrNonLeader{} + + g := NewWithT(t) + g.Expect(leaderOrNonLeader.NeedLeaderElection()).To(BeFalse()) +} + +func TestEnableAfterBecameLeader(t *testing.T) { + enabled := false + enableAfterBecameLeader := NewEnableAfterBecameLeader(func(_ context.Context) { + enabled = true + }) + + g := NewWithT(t) + g.Expect(enableAfterBecameLeader.NeedLeaderElection()).To(BeTrue()) + + err := enableAfterBecameLeader.Start(context.Background()) + g.Expect(err).To(BeNil()) + + g.Expect(enabled).To(BeTrue()) +} diff --git a/internal/mode/static/config/config.go b/internal/mode/static/config/config.go index 965315ad17..d434fce894 100644 --- a/internal/mode/static/config/config.go +++ b/internal/mode/static/config/config.go @@ -1,6 +1,8 @@ package config import ( + "time" + "github.com/go-logr/logr" "go.uber.org/zap" "k8s.io/apimachinery/pkg/types" @@ -32,6 +34,8 @@ type Config struct { MetricsConfig MetricsConfig // HealthConfig specifies the health probe config. HealthConfig HealthConfig + // TelemetryReportPeriod is the period at which telemetry reports are sent. + TelemetryReportPeriod time.Duration } // GatewayPodConfig contains information about this Pod. diff --git a/internal/mode/static/leader.go b/internal/mode/static/leader.go deleted file mode 100644 index 4e53f5539d..0000000000 --- a/internal/mode/static/leader.go +++ /dev/null @@ -1,102 +0,0 @@ -package static - -import ( - "context" - "errors" - "fmt" - "time" - - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/leaderelection" - "k8s.io/client-go/tools/leaderelection/resourcelock" - "k8s.io/client-go/tools/record" -) - -const ( - // These values are the defaults used by the core client. - renewDeadline = 10 * time.Second - leaseDuration = 15 * time.Second - retryPeriod = 2 * time.Second -) - -// leaderElectorRunnableConfig holds all the configuration for the leaderElector struct. -type leaderElectorRunnableConfig struct { - // kubeConfig is the kube config for the cluster. Used to create coreV1 and coordinationV1 clients which are needed - // for leader election. - kubeConfig *rest.Config - // recorder is the Kubernetes event recorder. Used to record events on the lease lock. - recorder record.EventRecorder - // onStartedLeading is the callback that is invoked asynchronously when the Pod starts leading. - onStartedLeading func(ctx context.Context) - // onStoppedLeading is the callback that is invoked asynchronously when the Pod stops leading. - onStoppedLeading func() - // lockNs is the namespace where the LeaseLock resource lives. - lockNs string - // lockName is the name of the LeaseLock resource. - lockName string - // identity is the unique name of this Pod. Used to identify the leader. - identity string -} - -// leaderElectorRunnable wraps a leaderelection.LeaderElector so that it implements the manager.Runnable interface -// and can be managed by the manager. -type leaderElectorRunnable struct { - le *leaderelection.LeaderElector -} - -// Start runs the leaderelection.LeaderElector and blocks until the context is canceled or the leader lease is lost. -// If the leader lease is lost, Start returns an error, and the controller-runtime manager will exit, causing the Pod -// to restart. This is necessary otherwise components that need leader election might continue to run after the leader -// lease was lost. -func (l *leaderElectorRunnable) Start(ctx context.Context) error { - l.le.Run(ctx) - - // Run exits if the context is canceled or the leader lease is lost. We only want to return an error if the - // context is not canceled. - select { - case <-ctx.Done(): - return nil - default: - return errors.New("leader election lost") - } -} - -// IsLeader returns if the Pod is the current leader. -func (l *leaderElectorRunnable) IsLeader() bool { - return l.le.IsLeader() -} - -// newLeaderElector returns a new leader elector client. -func newLeaderElectorRunnable(config leaderElectorRunnableConfig) (*leaderElectorRunnable, error) { - lock, err := resourcelock.NewFromKubeconfig( - resourcelock.LeasesResourceLock, - config.lockNs, - config.lockName, - resourcelock.ResourceLockConfig{ - Identity: config.identity, - EventRecorder: config.recorder, - }, - config.kubeConfig, - renewDeadline, - ) - if err != nil { - return nil, fmt.Errorf("error creating lease lock for leader election: %w", err) - } - - leaderElector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ - Lock: lock, - LeaseDuration: leaseDuration, - RenewDeadline: renewDeadline, - RetryPeriod: retryPeriod, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: config.onStartedLeading, - OnStoppedLeading: config.onStoppedLeading, - }, - Name: lock.Describe(), - }) - if err != nil { - return nil, fmt.Errorf("error creating leader elector: %w", err) - } - - return &leaderElectorRunnable{le: leaderElector}, nil -} diff --git a/internal/mode/static/manager.go b/internal/mode/static/manager.go index 3aa4850813..527ad2eff4 100644 --- a/internal/mode/static/manager.go +++ b/internal/mode/static/manager.go @@ -20,6 +20,7 @@ import ( "k8s.io/client-go/tools/record" ctlr "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + ctrlcfg "sigs.k8s.io/controller-runtime/pkg/config" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -34,6 +35,8 @@ import ( "github.com/nginxinc/nginx-gateway-fabric/internal/framework/controller/predicate" "github.com/nginxinc/nginx-gateway-fabric/internal/framework/events" "github.com/nginxinc/nginx-gateway-fabric/internal/framework/gatewayclass" + "github.com/nginxinc/nginx-gateway-fabric/internal/framework/helpers" + "github.com/nginxinc/nginx-gateway-fabric/internal/framework/runnables" "github.com/nginxinc/nginx-gateway-fabric/internal/framework/status" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/config" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/metrics/collectors" @@ -45,6 +48,7 @@ import ( "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/relationship" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/resolver" "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/state/validation" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/telemetry" ) const ( @@ -69,6 +73,17 @@ func StartManager(cfg config.Config) error { Scheme: scheme, Logger: cfg.Logger, Metrics: getMetricsOptions(cfg.MetricsConfig), + // Note: when the leadership is lost, the manager will exist returning an error in the Start() method. + // However, it will not wait for any Runnable it starts to finish, meaning any in-progress operations + // might get terminated half-way. + LeaderElection: true, + LeaderElectionNamespace: cfg.GatewayPodConfig.Namespace, + LeaderElectionID: cfg.LeaderElection.LockName, + LeaderElectionReleaseOnCancel: true, + Controller: ctrlcfg.Controller{ + // All of our controllers still need to work in case of non-leader pods + NeedLeaderElection: helpers.GetPointer(false), + }, } if cfg.HealthConfig.Enabled { @@ -211,35 +226,26 @@ func StartManager(cfg config.Config) error { firstBatchPreparer, ) - if err = mgr.Add(eventLoop); err != nil { + if err = mgr.Add(&runnables.LeaderOrNonLeader{Runnable: eventLoop}); err != nil { return fmt.Errorf("cannot register event loop: %w", err) } - leaderElectorLogger := cfg.Logger.WithName("leaderElector") + if err = mgr.Add(runnables.NewEnableAfterBecameLeader(statusUpdater.Enable)); err != nil { + return fmt.Errorf("cannot register enabling status updater on becoming leader: %w", err) + } - if cfg.LeaderElection.Enabled { - leaderElector, err := newLeaderElectorRunnable(leaderElectorRunnableConfig{ - kubeConfig: clusterCfg, - recorder: recorder, - onStartedLeading: func(ctx context.Context) { - leaderElectorLogger.Info("Started leading") - statusUpdater.Enable(ctx) + telemetryJob := &runnables.Leader{ + Runnable: telemetry.NewJob( + telemetry.JobConfig{ + Exporter: telemetry.NewLoggingExporter(cfg.Logger.WithName("telemetryExporter").V(1 /* debug */)), + Logger: cfg.Logger.WithName("telemetryJob"), + Period: cfg.TelemetryReportPeriod, }, - onStoppedLeading: func() { - leaderElectorLogger.Info("Stopped leading") - statusUpdater.Disable() - }, - lockNs: cfg.GatewayPodConfig.Namespace, - lockName: cfg.LeaderElection.LockName, - identity: cfg.LeaderElection.Identity, - }) - if err != nil { - return err - } + ), + } - if err = mgr.Add(leaderElector); err != nil { - return fmt.Errorf("cannot register leader elector: %w", err) - } + if err = mgr.Add(telemetryJob); err != nil { + return fmt.Errorf("cannot register telemetry job: %w", err) } cfg.Logger.Info("Starting manager") diff --git a/internal/mode/static/telemetry/doc.go b/internal/mode/static/telemetry/doc.go new file mode 100644 index 0000000000..b5ee61ac21 --- /dev/null +++ b/internal/mode/static/telemetry/doc.go @@ -0,0 +1,4 @@ +/* +Package telemetry is responsible for collecting and sending product telemetry data. +*/ +package telemetry diff --git a/internal/mode/static/telemetry/exporter.go b/internal/mode/static/telemetry/exporter.go new file mode 100644 index 0000000000..c010510926 --- /dev/null +++ b/internal/mode/static/telemetry/exporter.go @@ -0,0 +1,38 @@ +package telemetry + +import ( + "context" + + "github.com/go-logr/logr" +) + +// Data is telemetry data. +// Note: this type might change once https://github.com/nginxinc/nginx-gateway-fabric/issues/1318 is implemented. +type Data struct{} + +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 . Exporter + +// Exporter exports telemetry data to some destination. +// Note: this is a temporary interface. It will be finalized once the Exporter of the common telemetry library +// https://github.com/nginxinc/nginx-gateway-fabric/issues/1318 is implemented. +type Exporter interface { + Export(ctx context.Context, data Data) error +} + +// LoggingExporter logs telemetry data. +type LoggingExporter struct { + logger logr.Logger +} + +// NewLoggingExporter creates a new LoggingExporter. +func NewLoggingExporter(logger logr.Logger) *LoggingExporter { + return &LoggingExporter{ + logger: logger, + } +} + +// Export logs the provided telemetry data. +func (e *LoggingExporter) Export(_ context.Context, data Data) error { + e.logger.Info("Exporting telemetry", "data", data) + return nil +} diff --git a/internal/mode/static/telemetry/exporter_test.go b/internal/mode/static/telemetry/exporter_test.go new file mode 100644 index 0000000000..66f80606f4 --- /dev/null +++ b/internal/mode/static/telemetry/exporter_test.go @@ -0,0 +1,24 @@ +package telemetry + +import ( + "bytes" + "context" + "testing" + + . "github.com/onsi/gomega" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +func TestLoggingExporter(t *testing.T) { + g := NewWithT(t) + + var buffer bytes.Buffer + logger := zap.New(zap.WriteTo(&buffer)) + exporter := NewLoggingExporter(logger) + + err := exporter.Export(context.Background(), Data{}) + + g.Expect(err).To(BeNil()) + g.Expect(buffer.String()).To(ContainSubstring(`"level":"info"`)) + g.Expect(buffer.String()).To(ContainSubstring(`"msg":"Exporting telemetry"`)) +} diff --git a/internal/mode/static/telemetry/job.go b/internal/mode/static/telemetry/job.go new file mode 100644 index 0000000000..fba826cd25 --- /dev/null +++ b/internal/mode/static/telemetry/job.go @@ -0,0 +1,65 @@ +package telemetry + +import ( + "context" + "time" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/util/wait" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +// JobConfig is the configuration for the telemetry job. +type JobConfig struct { + // Exporter is the exporter to use for exporting telemetry data. + Exporter Exporter + // Logger is the logger. + Logger logr.Logger + // Period defines the period of the telemetry job. The job will run every Period. + Period time.Duration +} + +// Job periodically exports telemetry data using the provided exporter. +type Job struct { + cfg JobConfig +} + +// NewJob creates a new telemetry job. +func NewJob(cfg JobConfig) *Job { + return &Job{ + cfg: cfg, + } +} + +// Start starts the telemetry job. +// Implements controller-runtime manager.Runnable +func (j *Job) Start(ctx context.Context) error { + j.cfg.Logger.Info("Starting telemetry job") + + report := func(ctx context.Context) { + // Gather telemetry + j.cfg.Logger.V(1).Info("Gathering telemetry") + + // We will need to gather data as defined in https://github.com/nginxinc/nginx-gateway-fabric/issues/793 + data := Data{} + + // Export telemetry + j.cfg.Logger.V(1).Info("Exporting telemetry") + + if err := j.cfg.Exporter.Export(ctx, data); err != nil { + j.cfg.Logger.Error(err, "Failed to export telemetry") + } + } + + const ( + jitterFactor = 0.1 // If the period is 10 seconds, the jitter will be up to 1 second. + sliding = true // This means the period with jitter will be calculated after each report() call. + ) + + wait.JitterUntilWithContext(ctx, report, j.cfg.Period, jitterFactor, sliding) + + j.cfg.Logger.Info("Stopping telemetry job") + return nil +} + +var _ manager.Runnable = &Job{} diff --git a/internal/mode/static/telemetry/job_test.go b/internal/mode/static/telemetry/job_test.go new file mode 100644 index 0000000000..e8db641e1d --- /dev/null +++ b/internal/mode/static/telemetry/job_test.go @@ -0,0 +1,68 @@ +package telemetry_test + +import ( + "context" + "errors" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/telemetry" + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/telemetry/telemetryfakes" +) + +var _ = Describe("Job", func() { + var ( + job *telemetry.Job + exporter *telemetryfakes.FakeExporter + ) + const timeout = 10 * time.Second + + BeforeEach(func() { + exporter = &telemetryfakes.FakeExporter{} + job = telemetry.NewJob(telemetry.JobConfig{ + Exporter: exporter, + Logger: zap.New(), + Period: 1 * time.Millisecond, // 1ms is much smaller than timeout so the Job should report a few times + }) + }) + + DescribeTable( + "Job runs with a few reports without any errors", + func(exporterError error) { + // The fact that exporter return an error must not affect how many times the Job makes a report. + exporter.ExportReturns(exporterError) + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + + errCh := make(chan error) + go func() { + err := job.Start(ctx) + if errors.Is(err, context.Canceled) { + // we're not interested in context cancellation errors, + // as it might be returned (although highly unlikely) during normal test execution + err = nil + } + + errCh <- err + close(errCh) + }() + + const minReports = 3 // ensure that the Job reports at least this many times + + Eventually(exporter.ExportCallCount).Should(BeNumerically(">=", minReports)) + for i := 0; i < minReports; i++ { + _, data := exporter.ExportArgsForCall(i) + Expect(data).To(Equal(telemetry.Data{})) + } + + cancel() + Eventually(errCh).Should(Receive(BeNil())) + Eventually(errCh).Should(BeClosed()) + }, + Entry("Job runs with Exporter not returning errors", nil), + Entry("Job runs with Exporter returning an error", errors.New("some error")), + ) +}) diff --git a/internal/mode/static/telemetry/telemetry_suite_test.go b/internal/mode/static/telemetry/telemetry_suite_test.go new file mode 100644 index 0000000000..2eae64e153 --- /dev/null +++ b/internal/mode/static/telemetry/telemetry_suite_test.go @@ -0,0 +1,13 @@ +package telemetry + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestTelemetry(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Telemetry Suite") +} diff --git a/internal/mode/static/telemetry/telemetryfakes/fake_exporter.go b/internal/mode/static/telemetry/telemetryfakes/fake_exporter.go new file mode 100644 index 0000000000..741ab3cdec --- /dev/null +++ b/internal/mode/static/telemetry/telemetryfakes/fake_exporter.go @@ -0,0 +1,114 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package telemetryfakes + +import ( + "context" + "sync" + + "github.com/nginxinc/nginx-gateway-fabric/internal/mode/static/telemetry" +) + +type FakeExporter struct { + ExportStub func(context.Context, telemetry.Data) error + exportMutex sync.RWMutex + exportArgsForCall []struct { + arg1 context.Context + arg2 telemetry.Data + } + exportReturns struct { + result1 error + } + exportReturnsOnCall map[int]struct { + result1 error + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeExporter) Export(arg1 context.Context, arg2 telemetry.Data) error { + fake.exportMutex.Lock() + ret, specificReturn := fake.exportReturnsOnCall[len(fake.exportArgsForCall)] + fake.exportArgsForCall = append(fake.exportArgsForCall, struct { + arg1 context.Context + arg2 telemetry.Data + }{arg1, arg2}) + stub := fake.ExportStub + fakeReturns := fake.exportReturns + fake.recordInvocation("Export", []interface{}{arg1, arg2}) + fake.exportMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeExporter) ExportCallCount() int { + fake.exportMutex.RLock() + defer fake.exportMutex.RUnlock() + return len(fake.exportArgsForCall) +} + +func (fake *FakeExporter) ExportCalls(stub func(context.Context, telemetry.Data) error) { + fake.exportMutex.Lock() + defer fake.exportMutex.Unlock() + fake.ExportStub = stub +} + +func (fake *FakeExporter) ExportArgsForCall(i int) (context.Context, telemetry.Data) { + fake.exportMutex.RLock() + defer fake.exportMutex.RUnlock() + argsForCall := fake.exportArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeExporter) ExportReturns(result1 error) { + fake.exportMutex.Lock() + defer fake.exportMutex.Unlock() + fake.ExportStub = nil + fake.exportReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeExporter) ExportReturnsOnCall(i int, result1 error) { + fake.exportMutex.Lock() + defer fake.exportMutex.Unlock() + fake.ExportStub = nil + if fake.exportReturnsOnCall == nil { + fake.exportReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.exportReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeExporter) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.exportMutex.RLock() + defer fake.exportMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeExporter) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ telemetry.Exporter = new(FakeExporter) From 744b402ac0af298e7c94fa8f82cd266b6b2e661d Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Mon, 8 Jan 2024 10:31:52 -0500 Subject: [PATCH 02/10] Update internal/mode/static/manager.go Co-authored-by: Saylor Berman --- internal/mode/static/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/mode/static/manager.go b/internal/mode/static/manager.go index 527ad2eff4..fb127c0f80 100644 --- a/internal/mode/static/manager.go +++ b/internal/mode/static/manager.go @@ -73,7 +73,7 @@ func StartManager(cfg config.Config) error { Scheme: scheme, Logger: cfg.Logger, Metrics: getMetricsOptions(cfg.MetricsConfig), - // Note: when the leadership is lost, the manager will exist returning an error in the Start() method. + // Note: when the leadership is lost, the manager will return an error in the Start() method. // However, it will not wait for any Runnable it starts to finish, meaning any in-progress operations // might get terminated half-way. LeaderElection: true, From d226662d3c10611d6bf4e8389bdd9e9bedd9c070 Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Mon, 8 Jan 2024 10:40:37 -0500 Subject: [PATCH 03/10] Updated jitter --- internal/mode/static/telemetry/job.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/mode/static/telemetry/job.go b/internal/mode/static/telemetry/job.go index fba826cd25..6da8278513 100644 --- a/internal/mode/static/telemetry/job.go +++ b/internal/mode/static/telemetry/job.go @@ -52,8 +52,10 @@ func (j *Job) Start(ctx context.Context) error { } const ( - jitterFactor = 0.1 // If the period is 10 seconds, the jitter will be up to 1 second. - sliding = true // This means the period with jitter will be calculated after each report() call. + // 10 min jitter is enough per telemetry destination recommendation + // For the default period of 24 hours, jitter will be 10min /(24*60)min = 0.0069 + jitterFactor = 10.0 / (24 * 60) // added jitter is bound by jitterFactor * period + sliding = true // This means the period with jitter will be calculated after each report() call. ) wait.JitterUntilWithContext(ctx, report, j.cfg.Period, jitterFactor, sliding) From f05598a41c360a965455a8f96ef1fc784267510f Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Mon, 8 Jan 2024 10:41:48 -0500 Subject: [PATCH 04/10] Update error message --- internal/mode/static/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/mode/static/manager.go b/internal/mode/static/manager.go index fb127c0f80..dcc0b9243f 100644 --- a/internal/mode/static/manager.go +++ b/internal/mode/static/manager.go @@ -231,7 +231,7 @@ func StartManager(cfg config.Config) error { } if err = mgr.Add(runnables.NewEnableAfterBecameLeader(statusUpdater.Enable)); err != nil { - return fmt.Errorf("cannot register enabling status updater on becoming leader: %w", err) + return fmt.Errorf("cannot register status updater: %w", err) } telemetryJob := &runnables.Leader{ From 3050949bd3d0b9a80305c0d18ef8a4eda9c3880e Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Mon, 8 Jan 2024 11:05:40 -0500 Subject: [PATCH 05/10] Update goreleaser settings --- .goreleaser.yml | 2 ++ Makefile | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.goreleaser.yml b/.goreleaser.yml index a5793d31f6..2da49f57cf 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -14,6 +14,8 @@ builds: - all=-trimpath={{.Env.GOPATH}} asmflags: - all=-trimpath={{.Env.GOPATH}} + ldflags: + - -s -w -X main.version={{.Version}} -X main.commit={{.Commit}} -X main.date={{.Date}} -X main.telemetryReportPeriod=24h main: ./cmd/gateway/ binary: gateway diff --git a/Makefile b/Makefile index 0105bd4062..9a045be7a3 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,7 @@ NGINX_CONF_DIR = internal/mode/static/nginx/conf NJS_DIR = internal/mode/static/nginx/modules/src NGINX_DOCKER_BUILD_PLUS_ARGS = --secret id=nginx-repo.crt,src=nginx-repo.crt --secret id=nginx-repo.key,src=nginx-repo.key BUILD_AGENT=local -TELEMETRY_REPORT_PERIOD = 24h +TELEMETRY_REPORT_PERIOD = 24h # also configured in goreleaser.yml GW_API_VERSION = 1.0.0 INSTALL_WEBHOOK = false From bb12043576424781da87bce48a2f791413da3766 Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Mon, 8 Jan 2024 11:09:10 -0500 Subject: [PATCH 06/10] Fix YAML linting --- .yamllint.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.yamllint.yaml b/.yamllint.yaml index 685be0f800..3a77819f42 100644 --- a/.yamllint.yaml +++ b/.yamllint.yaml @@ -43,6 +43,7 @@ rules: deploy/manifests/nginx-gateway.yaml deploy/manifests/crds tests/longevity/manifests/cronjob.yaml + .goreleaser.yml new-line-at-end-of-file: enable new-lines: enable octal-values: disable From 57b25766ff508267dd9bb55c10a6b91c362696d8 Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Mon, 8 Jan 2024 17:26:11 -0500 Subject: [PATCH 07/10] Simplify test for Start of Job --- internal/mode/static/telemetry/job_test.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/internal/mode/static/telemetry/job_test.go b/internal/mode/static/telemetry/job_test.go index e8db641e1d..6be4001d54 100644 --- a/internal/mode/static/telemetry/job_test.go +++ b/internal/mode/static/telemetry/job_test.go @@ -39,14 +39,7 @@ var _ = Describe("Job", func() { errCh := make(chan error) go func() { - err := job.Start(ctx) - if errors.Is(err, context.Canceled) { - // we're not interested in context cancellation errors, - // as it might be returned (although highly unlikely) during normal test execution - err = nil - } - - errCh <- err + errCh <- job.Start(ctx) close(errCh) }() From be8b4e0ab4c37229ef8ea440b00c8597c582ac7d Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Mon, 8 Jan 2024 17:55:31 -0500 Subject: [PATCH 08/10] Changed LeaderElectionReleaseOnCancel to false and added a comment --- internal/mode/static/manager.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/internal/mode/static/manager.go b/internal/mode/static/manager.go index dcc0b9243f..03ee119232 100644 --- a/internal/mode/static/manager.go +++ b/internal/mode/static/manager.go @@ -76,10 +76,14 @@ func StartManager(cfg config.Config) error { // Note: when the leadership is lost, the manager will return an error in the Start() method. // However, it will not wait for any Runnable it starts to finish, meaning any in-progress operations // might get terminated half-way. - LeaderElection: true, - LeaderElectionNamespace: cfg.GatewayPodConfig.Namespace, - LeaderElectionID: cfg.LeaderElection.LockName, - LeaderElectionReleaseOnCancel: true, + LeaderElection: true, + LeaderElectionNamespace: cfg.GatewayPodConfig.Namespace, + LeaderElectionID: cfg.LeaderElection.LockName, + // We're not enabling LeaderElectionReleaseOnCancel because when the Manager stops gracefully, it waits + // for all started Runnables (including Leader-only ones) to finish. Otherwise, the new leader might start + // running Leader-only Runnables before the old leader has finished running them. + // See the doc comment for the LeaderElectionReleaseOnCancel for more details. + LeaderElectionReleaseOnCancel: false, Controller: ctrlcfg.Controller{ // All of our controllers still need to work in case of non-leader pods NeedLeaderElection: helpers.GetPointer(false), From 1a27c77c0b093599512db27c22b8a04a03e47392 Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Wed, 10 Jan 2024 10:34:11 -0500 Subject: [PATCH 09/10] Add an assertion in TestEnableAfterBecameLeader --- internal/framework/runnables/runnables_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/framework/runnables/runnables_test.go b/internal/framework/runnables/runnables_test.go index faf84a90af..f756146cd7 100644 --- a/internal/framework/runnables/runnables_test.go +++ b/internal/framework/runnables/runnables_test.go @@ -29,6 +29,7 @@ func TestEnableAfterBecameLeader(t *testing.T) { g := NewWithT(t) g.Expect(enableAfterBecameLeader.NeedLeaderElection()).To(BeTrue()) + g.Expect(enabled).To(BeFalse()) err := enableAfterBecameLeader.Start(context.Background()) g.Expect(err).To(BeNil()) From 06832aab238fdae14401456a03bc76ab8733da35 Mon Sep 17 00:00:00 2001 From: Michael Pleshakov Date: Wed, 10 Jan 2024 10:36:36 -0500 Subject: [PATCH 10/10] Update minReports and extend the comment --- internal/mode/static/telemetry/job_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/mode/static/telemetry/job_test.go b/internal/mode/static/telemetry/job_test.go index 6be4001d54..972f12ed1e 100644 --- a/internal/mode/static/telemetry/job_test.go +++ b/internal/mode/static/telemetry/job_test.go @@ -43,7 +43,7 @@ var _ = Describe("Job", func() { close(errCh) }() - const minReports = 3 // ensure that the Job reports at least this many times + const minReports = 2 // ensure that the Job reports more than once: it doesn't exit after the first report Eventually(exporter.ExportCallCount).Should(BeNumerically(">=", minReports)) for i := 0; i < minReports; i++ {