Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backport] [NPM] Backport NPM Changes for v1.5.41 #3344

Open
wants to merge 4 commits into
base: release/v1.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion aitelemetry/telemetrywrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
azurePublicCloudStr = "AzurePublicCloud"
hostNameKey = "hostname"
defaultTimeout = 10
maxCloseTimeoutInSeconds = 30
defaultBatchIntervalInSecs = 15
defaultBatchSizeInBytes = 32768
defaultGetEnvRetryCount = 5
Expand Down Expand Up @@ -330,8 +331,35 @@ func (th *telemetryHandle) Close(timeout int) {
timeout = defaultTimeout
}

// max wait is the minimum of the timeout and maxCloseTimeoutInSeconds
maxWaitTimeInSeconds := timeout
if maxWaitTimeInSeconds < maxCloseTimeoutInSeconds {
maxWaitTimeInSeconds = maxCloseTimeoutInSeconds
}

// wait for items to be sent otherwise timeout
<-th.client.Channel().Close(time.Duration(timeout) * time.Second)
// similar to the example in the appinsights-go repo: https://github.com/microsoft/ApplicationInsights-Go#shutdown
timer := time.NewTimer(time.Duration(maxWaitTimeInSeconds) * time.Second)
defer timer.Stop()
select {
case <-th.client.Channel().Close(time.Duration(timeout) * time.Second):
// timeout specified for retries.

// If we got here, then all telemetry was submitted
// successfully, and we can proceed to exiting.

case <-timer.C:
// absolute timeout. This covers any
// previous telemetry submission that may not have
// completed before Close was called.

// There are a number of reasons we could have
// reached here. We gave it a go, but telemetry
// submission failed somewhere. Perhaps old events
// were still retrying, or perhaps we're throttled.
// Either way, we don't want to wait around for it
// to complete, so let's just exit.
}

// Remove diganostic message listener
if th.diagListener != nil {
Expand Down
16 changes: 10 additions & 6 deletions npm/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ func newStartNPMCmd() *cobra.Command {
KubeConfigPath: viper.GetString(flagKubeConfigPath),
}

return start(*config, flags)
// start is blocking, unless there's an error
err = start(*config, flags)
metrics.Close()
return err
},
}

Expand Down Expand Up @@ -117,7 +120,10 @@ func start(config npmconfig.Config, flags npmconfig.Flags) error {
klog.Infof("Resync period for NPM pod is set to %d.", int(resyncPeriod/time.Minute))
factory := informers.NewSharedInformerFactory(clientset, resyncPeriod)

k8sServerVersion := k8sServerVersion(clientset)
err = metrics.CreateTelemetryHandle(config.NPMVersion(), version, npm.GetAIMetadata())
if err != nil {
klog.Infof("CreateTelemetryHandle failed with error %v. AITelemetry is not initialized.", err)
}

var dp dataplane.GenericDataplane
stopChannel := wait.NeverStop
Expand Down Expand Up @@ -181,11 +187,9 @@ func start(config npmconfig.Config, flags npmconfig.Flags) error {
}
dp.RunPeriodicTasks()
}

k8sServerVersion := k8sServerVersion(clientset)
npMgr := npm.NewNetworkPolicyManager(config, factory, dp, exec.New(), version, k8sServerVersion)
err = metrics.CreateTelemetryHandle(config.NPMVersion(), version, npm.GetAIMetadata())
if err != nil {
klog.Infof("CreateTelemetryHandle failed with error %v. AITelemetry is not initialized.", err)
}

go restserver.NPMRestServerListenAndServe(config, npMgr)

Expand Down
11 changes: 11 additions & 0 deletions npm/metrics/ai-utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"k8s.io/klog"
)

const telemetryCloseWaitTimeSeconds = 10

var (
th aitelemetry.TelemetryHandle
npmVersion int
Expand Down Expand Up @@ -54,6 +56,15 @@ func CreateTelemetryHandle(npmVersionNum int, imageVersion, aiMetadata string) e
return nil
}

// Close cleans up the telemetry handle, which effectively waits for all telemetry data to be sent
func Close() {
if th == nil {
return
}

th.Close(telemetryCloseWaitTimeSeconds)
}

// SendErrorLogAndMetric sends a metric through AI telemetry and sends a log to the Kusto Messages table
func SendErrorLogAndMetric(operationID int, format string, args ...interface{}) {
// Send error metrics
Expand Down
1 change: 1 addition & 0 deletions npm/pkg/dataplane/ipsets/ipsetmanager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ func (iMgr *IPSetManager) applyIPSets() error {
msg := fmt.Sprintf("exceeded max consecutive failures (%d) when applying ipsets. final error: %s", maxConsecutiveFailures, restoreError.Error())
klog.Error(msg)
metrics.SendErrorLogAndMetric(util.IpsmID, msg)
metrics.Close()
panic(msg)
}

Expand Down
4 changes: 3 additions & 1 deletion npm/pkg/dataplane/policies/policymanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ func TestBootup(t *testing.T) {
metrics.IncNumACLRules()

require.NoError(t, pMgr.Bootup(epIDs))
require.Equal(t, util.IptablesNft, util.Iptables)
if !util.IsWindowsDP() {
require.Equal(t, util.IptablesNft, util.Iptables)
}

expectedNumACLs := 11
if util.IsWindowsDP() {
Expand Down
Loading