diff --git a/aitelemetry/telemetrywrapper.go b/aitelemetry/telemetrywrapper.go index 36c59c212a..7caacf2979 100644 --- a/aitelemetry/telemetrywrapper.go +++ b/aitelemetry/telemetrywrapper.go @@ -27,6 +27,7 @@ const ( azurePublicCloudStr = "AzurePublicCloud" hostNameKey = "hostname" defaultTimeout = 10 + maxCloseTimeoutInSeconds = 30 defaultBatchIntervalInSecs = 15 defaultBatchSizeInBytes = 32768 defaultGetEnvRetryCount = 5 @@ -330,8 +331,35 @@ func (th *telemetryHandle) Close(timeout int) { timeout = defaultTimeout } + // max wait is the minimum of the timeout and maxCloseTimeoutInSeconds + maxWaitTimeInSeconds := timeout + if maxWaitTimeInSeconds < maxCloseTimeoutInSeconds { + maxWaitTimeInSeconds = maxCloseTimeoutInSeconds + } + // wait for items to be sent otherwise timeout - <-th.client.Channel().Close(time.Duration(timeout) * time.Second) + // similar to the example in the appinsights-go repo: https://github.com/microsoft/ApplicationInsights-Go#shutdown + timer := time.NewTimer(time.Duration(maxWaitTimeInSeconds) * time.Second) + defer timer.Stop() + select { + case <-th.client.Channel().Close(time.Duration(timeout) * time.Second): + // timeout specified for retries. + + // If we got here, then all telemetry was submitted + // successfully, and we can proceed to exiting. + + case <-timer.C: + // absolute timeout. This covers any + // previous telemetry submission that may not have + // completed before Close was called. + + // There are a number of reasons we could have + // reached here. We gave it a go, but telemetry + // submission failed somewhere. Perhaps old events + // were still retrying, or perhaps we're throttled. + // Either way, we don't want to wait around for it + // to complete, so let's just exit. + } // Remove diganostic message listener if th.diagListener != nil { diff --git a/npm/cmd/start.go b/npm/cmd/start.go index 223751ba16..b36d10422d 100644 --- a/npm/cmd/start.go +++ b/npm/cmd/start.go @@ -56,7 +56,10 @@ func newStartNPMCmd() *cobra.Command { KubeConfigPath: viper.GetString(flagKubeConfigPath), } - return start(*config, flags) + // start is blocking, unless there's an error + err = start(*config, flags) + metrics.Close() + return err }, } @@ -117,7 +120,10 @@ func start(config npmconfig.Config, flags npmconfig.Flags) error { klog.Infof("Resync period for NPM pod is set to %d.", int(resyncPeriod/time.Minute)) factory := informers.NewSharedInformerFactory(clientset, resyncPeriod) - k8sServerVersion := k8sServerVersion(clientset) + err = metrics.CreateTelemetryHandle(config.NPMVersion(), version, npm.GetAIMetadata()) + if err != nil { + klog.Infof("CreateTelemetryHandle failed with error %v. AITelemetry is not initialized.", err) + } var dp dataplane.GenericDataplane stopChannel := wait.NeverStop @@ -181,11 +187,9 @@ func start(config npmconfig.Config, flags npmconfig.Flags) error { } dp.RunPeriodicTasks() } + + k8sServerVersion := k8sServerVersion(clientset) npMgr := npm.NewNetworkPolicyManager(config, factory, dp, exec.New(), version, k8sServerVersion) - err = metrics.CreateTelemetryHandle(config.NPMVersion(), version, npm.GetAIMetadata()) - if err != nil { - klog.Infof("CreateTelemetryHandle failed with error %v. AITelemetry is not initialized.", err) - } go restserver.NPMRestServerListenAndServe(config, npMgr) diff --git a/npm/metrics/ai-utils.go b/npm/metrics/ai-utils.go index 20de3009ff..068fdb8f6a 100644 --- a/npm/metrics/ai-utils.go +++ b/npm/metrics/ai-utils.go @@ -11,6 +11,8 @@ import ( "k8s.io/klog" ) +const telemetryCloseWaitTimeSeconds = 10 + var ( th aitelemetry.TelemetryHandle npmVersion int @@ -54,6 +56,15 @@ func CreateTelemetryHandle(npmVersionNum int, imageVersion, aiMetadata string) e return nil } +// Close cleans up the telemetry handle, which effectively waits for all telemetry data to be sent +func Close() { + if th == nil { + return + } + + th.Close(telemetryCloseWaitTimeSeconds) +} + // SendErrorLogAndMetric sends a metric through AI telemetry and sends a log to the Kusto Messages table func SendErrorLogAndMetric(operationID int, format string, args ...interface{}) { // Send error metrics diff --git a/npm/pkg/dataplane/ipsets/ipsetmanager_linux.go b/npm/pkg/dataplane/ipsets/ipsetmanager_linux.go index d654e589f3..1e1f2eaf7e 100644 --- a/npm/pkg/dataplane/ipsets/ipsetmanager_linux.go +++ b/npm/pkg/dataplane/ipsets/ipsetmanager_linux.go @@ -422,6 +422,7 @@ func (iMgr *IPSetManager) applyIPSets() error { msg := fmt.Sprintf("exceeded max consecutive failures (%d) when applying ipsets. final error: %s", maxConsecutiveFailures, restoreError.Error()) klog.Error(msg) metrics.SendErrorLogAndMetric(util.IpsmID, msg) + metrics.Close() panic(msg) } diff --git a/npm/pkg/dataplane/policies/policymanager_test.go b/npm/pkg/dataplane/policies/policymanager_test.go index 2ce9b27285..22add8eea0 100644 --- a/npm/pkg/dataplane/policies/policymanager_test.go +++ b/npm/pkg/dataplane/policies/policymanager_test.go @@ -115,7 +115,9 @@ func TestBootup(t *testing.T) { metrics.IncNumACLRules() require.NoError(t, pMgr.Bootup(epIDs)) - require.Equal(t, util.IptablesNft, util.Iptables) + if !util.IsWindowsDP() { + require.Equal(t, util.IptablesNft, util.Iptables) + } expectedNumACLs := 11 if util.IsWindowsDP() {