From fe6759b7d6f0e77e2f8de6bc74f48661c59354f1 Mon Sep 17 00:00:00 2001 From: Shaun Date: Wed, 7 Feb 2024 09:46:16 +0000 Subject: [PATCH] Telemetry Job (#4896) --- charts/nginx-ingress/README.md | 3 +- charts/nginx-ingress/templates/_helpers.tpl | 1 + charts/nginx-ingress/values.schema.json | 8 ++ charts/nginx-ingress/values.yaml | 3 + cmd/nginx-ingress/flags.go | 18 +++ cmd/nginx-ingress/flags_test.go | 24 ++++ cmd/nginx-ingress/main.go | 5 +- internal/k8s/controller.go | 29 +++++ internal/k8s/controller_test.go | 40 ++++++ internal/k8s/leader.go | 4 + internal/nginx/fake_manager.go | 2 +- internal/telemetry/telemetry.go | 135 ++++++++++++++++++++ internal/telemetry/telemetry_test.go | 61 +++++++++ 13 files changed, 330 insertions(+), 3 deletions(-) create mode 100644 internal/telemetry/telemetry.go create mode 100644 internal/telemetry/telemetry_test.go diff --git a/charts/nginx-ingress/README.md b/charts/nginx-ingress/README.md index 2804458bf5..ceeb03bc9e 100644 --- a/charts/nginx-ingress/README.md +++ b/charts/nginx-ingress/README.md @@ -464,9 +464,10 @@ The following tables lists the configurable parameters of the NGINX Ingress Cont |`controller.strategy` | Specifies the strategy used to replace old Pods with new ones. Docs for [Deployment update strategy](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#strategy) and [Daemonset update strategy](https://kubernetes.io/docs/tasks/manage-daemon/update-daemon-set/#daemonset-update-strategy) | {} | |`controller.disableIPV6` | Disable IPV6 listeners explicitly for nodes that do not support the IPV6 stack. | false | |`controller.defaultHTTPListenerPort` | Sets the port for the HTTP `default_server` listener. | 80 | -|`controller.defaultHTTPSListenerPort` | Sets the port for the HTTPS `default_server` listener. | 443 | +|`controller.defaultHTTPSListenerPort` | Sets the port for the HTTPS `default_server` listener. | 443 | |`controller.readOnlyRootFilesystem` | Configure root filesystem as read-only and add volumes for temporary data. | false | |`controller.enableSSLDynamicReload` | Enable lazy loading for SSL Certificates. | true | +|`controller.enableTelemetryReporting` | Enable telemetry reporting. | true | |`rbac.create` | Configures RBAC. | true | |`prometheus.create` | Expose NGINX or NGINX Plus metrics in the Prometheus format. | true | |`prometheus.port` | Configures the port to scrape the metrics. | 9113 | diff --git a/charts/nginx-ingress/templates/_helpers.tpl b/charts/nginx-ingress/templates/_helpers.tpl index 2f5add833d..88a3d5c2bc 100644 --- a/charts/nginx-ingress/templates/_helpers.tpl +++ b/charts/nginx-ingress/templates/_helpers.tpl @@ -223,4 +223,5 @@ Build the args for the service binary. - -ready-status-port={{ .Values.controller.readyStatus.port }} - -enable-latency-metrics={{ .Values.controller.enableLatencyMetrics }} - -ssl-dynamic-reload={{ .Values.controller.enableSSLDynamicReload }} +- -enable-telemetry-reporting={{ .Values.controller.enableTelemetryReporting}} {{- end -}} diff --git a/charts/nginx-ingress/values.schema.json b/charts/nginx-ingress/values.schema.json index b0fd55061f..15ceaeeec1 100644 --- a/charts/nginx-ingress/values.schema.json +++ b/charts/nginx-ingress/values.schema.json @@ -1367,6 +1367,14 @@ "examples": [ true ] + }, + "enableTelemetryReporting": { + "type": "boolean", + "default": true, + "title": "Enable telemetry reporting", + "examples": [ + true + ] } }, "examples": [ diff --git a/charts/nginx-ingress/values.yaml b/charts/nginx-ingress/values.yaml index 8b1571b197..5e98237194 100644 --- a/charts/nginx-ingress/values.yaml +++ b/charts/nginx-ingress/values.yaml @@ -465,6 +465,9 @@ controller: ## Enable dynamic reloading of certificates enableSSLDynamicReload: true + ## Enable telemetry reporting + enableTelemetryReporting: true + rbac: ## Configures RBAC. create: true diff --git a/cmd/nginx-ingress/flags.go b/cmd/nginx-ingress/flags.go index da51ac0ebd..6972c4104e 100644 --- a/cmd/nginx-ingress/flags.go +++ b/cmd/nginx-ingress/flags.go @@ -1,6 +1,7 @@ package main import ( + "errors" "flag" "fmt" "net" @@ -8,6 +9,7 @@ import ( "regexp" "strconv" "strings" + "time" "github.com/golang/glog" api_v1 "k8s.io/api/core/v1" @@ -201,6 +203,8 @@ var ( enableDynamicSSLReload = flag.Bool(dynamicSSLReloadParam, true, "Enable reloading of SSL Certificates without restarting the NGINX process.") + enableTelemetryReporting = flag.Bool("enable-telemetry-reporting", true, "Enable gathering and reporting of product related telemetry.") + startupCheckFn func() error ) @@ -489,3 +493,17 @@ func validateLocation(location string) error { } return nil } + +// validateReportingPeriod checks if the reporting period parameter can be parsed. +// +// This function will be deprecated in NIC v3.5. It is used only for demo and testing purpose. +func validateReportingPeriod(period string) error { + duration, err := time.ParseDuration(period) + if err != nil { + return err + } + if duration.Minutes() < 1 { + return errors.New("invalid reporting period, expected minimum 1m") + } + return nil +} diff --git a/cmd/nginx-ingress/flags_test.go b/cmd/nginx-ingress/flags_test.go index 1715d2a3e8..07a32a3f7f 100644 --- a/cmd/nginx-ingress/flags_test.go +++ b/cmd/nginx-ingress/flags_test.go @@ -172,3 +172,27 @@ func TestValidateNamespaces(t *testing.T) { } } } + +func TestValidateReportingPeriodWithInvalidInput(t *testing.T) { + t.Parallel() + + periods := []string{"", "-1", "1x", "abc", "-", "30s", "10ms", "0h"} + for _, p := range periods { + err := validateReportingPeriod(p) + if err == nil { + t.Errorf("want error on invalid period %s, got nil", p) + } + } +} + +func TestValidateReportingPeriodWithValidInput(t *testing.T) { + t.Parallel() + + periods := []string{"1m", "1h", "24h"} + for _, p := range periods { + err := validateReportingPeriod(p) + if err != nil { + t.Error(err) + } + } +} diff --git a/cmd/nginx-ingress/main.go b/cmd/nginx-ingress/main.go index d093318356..5df8f2c17b 100644 --- a/cmd/nginx-ingress/main.go +++ b/cmd/nginx-ingress/main.go @@ -41,7 +41,9 @@ import ( ) // Injected during build -var version string +var ( + version string +) const ( nginxVersionLabel = "app.nginx.org/version" @@ -199,6 +201,7 @@ func main() { ExternalDNSEnabled: *enableExternalDNS, IsIPV6Disabled: *disableIPV6, WatchNamespaceLabel: *watchNamespaceLabel, + EnableTelemetryReporting: *enableTelemetryReporting, } lbc := k8s.NewLoadBalancerController(lbcInput) diff --git a/internal/k8s/controller.go b/internal/k8s/controller.go index 45620e30b4..b99b2e66d7 100644 --- a/internal/k8s/controller.go +++ b/internal/k8s/controller.go @@ -25,6 +25,8 @@ import ( "sync" "time" + "github.com/nginxinc/kubernetes-ingress/internal/telemetry" + "github.com/nginxinc/kubernetes-ingress/pkg/apis/dos/v1beta1" "golang.org/x/exp/maps" @@ -161,6 +163,8 @@ type LoadBalancerController struct { enableBatchReload bool isIPV6Disabled bool namespaceWatcherController cache.Controller + telemetryCollector *telemetry.Collector + telemetryChan chan struct{} } var keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc @@ -206,6 +210,7 @@ type NewLoadBalancerControllerInput struct { ExternalDNSEnabled bool IsIPV6Disabled bool WatchNamespaceLabel string + EnableTelemetryReporting bool } // NewLoadBalancerController creates a controller @@ -271,6 +276,18 @@ func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalanc lbc.externalDNSController = ed_controller.NewController(ed_controller.BuildOpts(context.TODO(), lbc.namespaceList, lbc.recorder, lbc.confClient, input.ResyncPeriod, isDynamicNs)) } + // NIC Telemetry Reporting + if input.EnableTelemetryReporting { + lbc.telemetryChan = make(chan struct{}) + collector, err := telemetry.NewCollector( + telemetry.WithTimePeriod("24h"), + ) + if err != nil { + glog.Fatalf("failed to initialize telemetry collector: %v", err) + } + lbc.telemetryCollector = collector + } + glog.V(3).Infof("Nginx Ingress Controller has class: %v", input.IngressClass) lbc.namespacedInformers = make(map[string]*namespacedInformer) @@ -683,10 +700,22 @@ func (lbc *LoadBalancerController) Run() { if lbc.externalDNSController != nil { go lbc.externalDNSController.Run(lbc.ctx.Done()) } + if lbc.leaderElector != nil { go lbc.leaderElector.Run(lbc.ctx) } + if lbc.telemetryCollector != nil { + go func(ctx context.Context) { + select { + case <-lbc.telemetryChan: + lbc.telemetryCollector.Start(lbc.ctx) + case <-ctx.Done(): + return + } + }(lbc.ctx) + } + for _, nif := range lbc.namespacedInformers { nif.start() } diff --git a/internal/k8s/controller_test.go b/internal/k8s/controller_test.go index 4987336e3e..d34eef47c6 100644 --- a/internal/k8s/controller_test.go +++ b/internal/k8s/controller_test.go @@ -7,6 +7,9 @@ import ( "sort" "strings" "testing" + "time" + + "github.com/nginxinc/kubernetes-ingress/internal/telemetry" discovery_v1 "k8s.io/api/discovery/v1" @@ -3747,3 +3750,40 @@ func TestPreSyncSecrets(t *testing.T) { t.Errorf("GetSecret(%q) returned a reference without an expected error", unsupportedKey) } } + +func TestNewTelemetryCollector(t *testing.T) { + t.Parallel() + + testCases := []struct { + testCase string + input NewLoadBalancerControllerInput + expectedCollector telemetry.Collector + }{ + { + testCase: "New Telemetry Collector with default values", + input: NewLoadBalancerControllerInput{ + KubeClient: fake.NewSimpleClientset(), + EnableTelemetryReporting: true, + }, + expectedCollector: telemetry.Collector{ + Period: 24 * time.Hour, + Exporter: telemetry.DiscardExporter, + }, + }, + { + testCase: "New Telemetry Collector with Telemetry Reporting set to false", + input: NewLoadBalancerControllerInput{ + KubeClient: fake.NewSimpleClientset(), + EnableTelemetryReporting: false, + }, + expectedCollector: telemetry.Collector{}, + }, + } + + for _, tc := range testCases { + lbc := NewLoadBalancerController(tc.input) + if reflect.DeepEqual(tc.expectedCollector, lbc.telemetryCollector) { + t.Fatalf("Expected %x, but got %x", tc.expectedCollector, lbc.telemetryCollector) + } + } +} diff --git a/internal/k8s/leader.go b/internal/k8s/leader.go index f87186ad5f..7d3cf5ae45 100644 --- a/internal/k8s/leader.go +++ b/internal/k8s/leader.go @@ -58,6 +58,10 @@ func createLeaderHandler(lbc *LoadBalancerController) leaderelection.LeaderCallb return leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { glog.V(3).Info("started leading") + // Closing this channel allows the leader to start the telemetry reporting process + if lbc.telemetryChan != nil { + close(lbc.telemetryChan) + } if lbc.reportIngressStatus { ingresses := lbc.configuration.GetResourcesWithFilter(resourceFilter{Ingresses: true}) diff --git a/internal/nginx/fake_manager.go b/internal/nginx/fake_manager.go index 1e42c51ed8..65ccbcc578 100644 --- a/internal/nginx/fake_manager.go +++ b/internal/nginx/fake_manager.go @@ -103,7 +103,7 @@ func (fm *FakeManager) CreateDHParam(_ string) (string, error) { // Version provides a fake implementation of Version. func (*FakeManager) Version() Version { glog.V(3).Info("Printing nginx version") - return Version{} + return NewVersion("nginx version: nginx/1.25.3 (nginx-plus-r31)") } // Start provides a fake implementation of Start. diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go new file mode 100644 index 0000000000..a1c1a5272e --- /dev/null +++ b/internal/telemetry/telemetry.go @@ -0,0 +1,135 @@ +// Package telemetry provides functionality for collecting and exporting NIC telemetry data. +package telemetry + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/util/wait" +) + +// DiscardExporter is a temporary exporter +// for discarding collected telemetry data. +var DiscardExporter = Exporter{Endpoint: io.Discard} + +// Exporter represents a temporary telemetry data exporter. +type Exporter struct { + Endpoint io.Writer +} + +// Export takes context and trace data and writes to the endpoint. +func (e *Exporter) Export(_ context.Context, td TraceData) error { + // Note: exporting functionality will be implemented in a separate module. + fmt.Fprintf(e.Endpoint, "%+v", td) + return nil +} + +// TraceData holds collected NIC telemetry data. +type TraceData struct { + // Count of VirtualServers + VSCount int + // Count of TransportServers + TSCount int + + // TODO + // Add more fields for NIC data points +} + +// Option is a functional option used for configuring TraceReporter. +type Option func(*Collector) error + +// WithTimePeriod configures reporting time on TraceReporter. +func WithTimePeriod(period string) Option { + return func(c *Collector) error { + d, err := time.ParseDuration(period) + if err != nil { + return err + } + c.Period = d + return nil + } +} + +// WithExporter configures telemetry collector to use given exporter. +// +// This may change in the future when we use exporter implemented +// in the external module. +func WithExporter(e Exporter) Option { + return func(c *Collector) error { + c.Exporter = e + return nil + } +} + +// Collector is NIC telemetry data collector. +type Collector struct { + Period time.Duration + + // Exporter is a temp exporter for exporting telemetry data. + // The concrete implementation will be implemented in a separate module. + Exporter Exporter +} + +// NewCollector takes 0 or more options and creates a new TraceReporter. +// If no options are provided, NewReporter returns TraceReporter +// configured to gather data every 24h. +func NewCollector(opts ...Option) (*Collector, error) { + c := Collector{ + Period: 24 * time.Hour, + Exporter: DiscardExporter, // Use DiscardExporter until the real exporter is available. + } + for _, o := range opts { + if err := o(&c); err != nil { + return nil, err + } + } + return &c, nil +} + +// BuildReport takes context and builds report from gathered telemetry data. +func (c *Collector) BuildReport(context.Context) (TraceData, error) { + dt := TraceData{} + + // TODO: Implement handling and logging errors for each collected data point + + return dt, nil +} + +// Collect collects and exports telemetry data. +// It exports data using provided exporter. +func (c *Collector) Collect(ctx context.Context) { + glog.V(3).Info("Collecting telemetry data") + traceData, err := c.BuildReport(ctx) + if err != nil { + glog.Errorf("Error collecting telemetry data: %v", err) + } + err = c.Exporter.Export(ctx, traceData) + if err != nil { + glog.Errorf("Error exporting telemetry data: %v", err) + } + glog.V(3).Infof("Exported telemetry data: %x", traceData) +} + +// Start starts running NIC Telemetry Collector. +func (c *Collector) Start(ctx context.Context) { + wait.JitterUntilWithContext(ctx, c.Collect, c.Period, 0.1, true) +} + +// GetVSCount returns number of VirtualServers in watched namespaces. +// +// Note: this is a placeholder function. +func (c *Collector) GetVSCount() int { + // Placeholder function + return 0 +} + +// GetTSCount returns number of TransportServers in watched namespaces. +// +// Note: this is a placeholder function. +func (c *Collector) GetTSCount() int { + // Placeholder function + return 0 +} diff --git a/internal/telemetry/telemetry_test.go b/internal/telemetry/telemetry_test.go new file mode 100644 index 0000000000..99ca921ae7 --- /dev/null +++ b/internal/telemetry/telemetry_test.go @@ -0,0 +1,61 @@ +package telemetry_test + +import ( + "bytes" + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/nginxinc/kubernetes-ingress/internal/telemetry" +) + +func TestCreateNewDefaultCollector(t *testing.T) { + t.Parallel() + + c, err := telemetry.NewCollector() + if err != nil { + t.Fatal(err) + } + + want := 24.0 + got := c.Period.Hours() + + if !cmp.Equal(want, got) { + t.Error(cmp.Diff(want, got)) + } +} + +func TestCreateNewCollectorWithCustomReportingPeriod(t *testing.T) { + t.Parallel() + + c, err := telemetry.NewCollector(telemetry.WithTimePeriod("4h")) + if err != nil { + t.Fatal(err) + } + + want := 4.0 + got := c.Period.Hours() + + if !cmp.Equal(want, got) { + t.Error(cmp.Diff(want, got)) + } +} + +func TestCreateNewCollectorWithCustomExporter(t *testing.T) { + t.Parallel() + + buf := &bytes.Buffer{} + exp := telemetry.Exporter{Endpoint: buf} + + c, err := telemetry.NewCollector(telemetry.WithExporter(exp)) + if err != nil { + t.Fatal(err) + } + c.Collect(context.Background()) + + want := "{VSCount:0 TSCount:0}" + got := buf.String() + if !cmp.Equal(want, got) { + t.Error(cmp.Diff(want, got)) + } +}