Skip to content

Commit

Permalink
[CNI] zap logger telemetry package (#2266)
Browse files Browse the repository at this point in the history
* zap logger telemetry package
  • Loading branch information
paulyufan2 authored Oct 4, 2023
1 parent 64f01b2 commit ae0c08c
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 34 deletions.
4 changes: 2 additions & 2 deletions cni/network/plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func rootExecute() error {
if err = netPlugin.Plugin.InitializeKeyValueStore(&config); err != nil {
printCNIError(fmt.Sprintf("Failed to initialize key-value store of network plugin: %v", err))

tb = telemetry.NewTelemetryBuffer()
tb = telemetry.NewTelemetryBuffer(logger)
if tberr := tb.Connect(); tberr != nil {
logger.Error("Cannot connect to telemetry service", zap.Error(tberr))
return errors.Wrap(err, "lock acquire error")
Expand Down Expand Up @@ -228,7 +228,7 @@ func rootExecute() error {

// Start telemetry process if not already started. This should be done inside lock, otherwise multiple process
// end up creating/killing telemetry process results in undesired state.
tb = telemetry.NewTelemetryBuffer()
tb = telemetry.NewTelemetryBuffer(logger)
tb.ConnectToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds)
defer tb.Close()

Expand Down
2 changes: 1 addition & 1 deletion cni/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (plugin *Plugin) DelegateDel(pluginName string, nwCfg *NetworkConfig) error
zap.String("plugin", pluginName),
zap.Any("config", nwCfg))
defer func() {
logger.Info("Plugin eturned",
logger.Info("Plugin returned",
zap.String("plugin", pluginName),
zap.Error(err))
}()
Expand Down
12 changes: 6 additions & 6 deletions cni/telemetry/service/telemetrymain.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,12 @@ func main() {
logger.Info("Config after setting defaults", zap.Any("config", config))

// Cleaning up orphan socket if present
tbtemp := telemetry.NewTelemetryBuffer()
tbtemp := telemetry.NewTelemetryBuffer(logger)
tbtemp.Cleanup(telemetry.FdName)

for {
tb = telemetry.NewTelemetryBuffer()
tb = telemetry.NewTelemetryBuffer(logger)

for {
logger.Info("Starting telemetry server")
err = tb.StartServer()
if err == nil || tb.FdExists {
Expand All @@ -168,10 +168,10 @@ func main() {
GetEnvRetryWaitTimeInSecs: config.GetEnvRetryWaitTimeInSecs,
}

if telemetry.CreateAITelemetryHandle(aiConfig, config.DisableAll, config.DisableTrace, config.DisableMetric) != nil {
logger.Error("[Telemetry] AI Handle creation error", zap.Error(err))
if tb.CreateAITelemetryHandle(aiConfig, config.DisableAll, config.DisableTrace, config.DisableMetric) != nil {
logger.Error("AI Handle creation error", zap.Error(err))
}
logger.Info("[Telemetry] Report to host interval", zap.Duration("seconds", config.ReportToHostIntervalInSeconds))
logger.Info("Report to host interval", zap.Duration("seconds", config.ReportToHostIntervalInSeconds))
tb.PushData(context.Background())
telemetry.CloseAITelemetryHandle()
}
2 changes: 1 addition & 1 deletion cnms/service/networkmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func main() {
CNIReport: reportManager.Report.(*telemetry.CNIReport),
}

tb := telemetry.NewTelemetryBuffer()
tb := telemetry.NewTelemetryBuffer(nil)
tb.ConnectToTelemetryService(telemetryNumRetries, telemetryWaitTimeInMilliseconds)
defer tb.Close()

Expand Down
6 changes: 3 additions & 3 deletions cns/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,17 +430,17 @@ func sendRegisterNodeRequest(httpc *http.Client, httpRestService cns.HTTPService
func startTelemetryService(ctx context.Context) {
var config aitelemetry.AIConfig

err := telemetry.CreateAITelemetryHandle(config, false, false, false)
tb := telemetry.NewTelemetryBuffer(nil)
err := tb.CreateAITelemetryHandle(config, false, false, false)
if err != nil {
log.Errorf("AI telemetry handle creation failed..:%w", err)
return
}

tbtemp := telemetry.NewTelemetryBuffer()
tbtemp := telemetry.NewTelemetryBuffer(nil)
//nolint:errcheck // best effort to cleanup leaked pipe/socket before start
tbtemp.Cleanup(telemetry.FdName)

tb := telemetry.NewTelemetryBuffer()
err = tb.StartServer()
if err != nil {
log.Errorf("Telemetry service failed to start: %w", err)
Expand Down
8 changes: 6 additions & 2 deletions telemetry/aiwrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ const (
waitTimeInSecs = 10
)

func CreateAITelemetryHandle(aiConfig aitelemetry.AIConfig, disableAll, disableMetric, disableTrace bool) error {
func (tb *TelemetryBuffer) CreateAITelemetryHandle(aiConfig aitelemetry.AIConfig, disableAll, disableMetric, disableTrace bool) error {
var err error

if disableAll {
log.Printf("Telemetry is disabled")
if tb.logger != nil {
tb.logger.Info("Telemetry is disabled")
} else {
log.Printf("Telemetry is disabled")
}
return ErrTelemetryDisabled
}

Expand Down
3 changes: 2 additions & 1 deletion telemetry/aiwrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func TestCreateAITelemetryHandle(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
err := CreateAITelemetryHandle(tt.aiConfig, tt.disableAll, tt.disableMetric, tt.disableTrace)
tb := NewTelemetryBuffer(nil)
err := tb.CreateAITelemetryHandle(tt.aiConfig, tt.disableAll, tt.disableMetric, tt.disableTrace)
if tt.wantErr {
require.Error(t, err)
return
Expand Down
11 changes: 8 additions & 3 deletions telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/Azure/azure-container-networking/log"
"github.com/Azure/azure-container-networking/platform"
"github.com/pkg/errors"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -112,7 +113,11 @@ func (reportMgr *ReportManager) SendReport(tb *TelemetryBuffer) error {
report, err = reportMgr.ReportToBytes()
if err == nil {
if _, err = tb.Write(report); err != nil {
log.Printf("telemetry write failed:%v", err)
if tb.logger != nil {
tb.logger.Error("telemetry write failed", zap.Error(err))
} else {
log.Printf("telemetry write failed:%v", err)
}
}
}
}
Expand Down Expand Up @@ -143,7 +148,7 @@ func SendCNIMetric(cniMetric *AIMetric, tb *TelemetryBuffer) error {
report, err = reportMgr.ReportToBytes()
if err == nil {
if _, err = tb.Write(report); err != nil {
log.Printf("Error writing to telemetry socket:%v", err)
tb.logger.Error("Error writing to telemetry socket", zap.Error(err))
}
}
}
Expand All @@ -157,7 +162,7 @@ func SendCNIEvent(tb *TelemetryBuffer, report *CNIReport) {
reportBytes, err := reportMgr.ReportToBytes()
if err == nil {
if _, err = tb.Write(reportBytes); err != nil {
log.Printf("Error writing to telemetry socket:%v", err)
tb.logger.Error("Error writing to telemetry socket", zap.Error(err))
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"testing"

"github.com/Azure/azure-container-networking/aitelemetry"
"github.com/Azure/azure-container-networking/cni/log"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

var telemetryTests = []struct {
Expand Down Expand Up @@ -51,7 +53,8 @@ var telemetryTests = []struct {
}

func TestMain(m *testing.M) {
tb := NewTelemetryBuffer()
telemetryLog := log.CNILogger.With(zap.String("component", "cni-telemetry"))
tb := NewTelemetryBuffer(telemetryLog)
_ = tb.Cleanup(FdName)
exitCode := m.Run()
os.Exit(exitCode)
Expand Down
47 changes: 39 additions & 8 deletions telemetry/telemetrybuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/Azure/azure-container-networking/common"
"github.com/Azure/azure-container-networking/log"
"github.com/Azure/azure-container-networking/platform"
"go.uber.org/zap"
)

// TelemetryConfig - telemetry config read by telemetry service
Expand Down Expand Up @@ -55,6 +56,7 @@ type TelemetryBuffer struct {
data chan interface{}
cancel chan bool
mutex sync.Mutex
logger *zap.Logger
}

// Buffer object holds the different types of reports
Expand All @@ -63,12 +65,13 @@ type Buffer struct {
}

// NewTelemetryBuffer - create a new TelemetryBuffer
func NewTelemetryBuffer() *TelemetryBuffer {
func NewTelemetryBuffer(logger *zap.Logger) *TelemetryBuffer {
var tb TelemetryBuffer

tb.data = make(chan interface{}, MaxNumReports)
tb.cancel = make(chan bool, 1)
tb.connections = make([]net.Conn, 0)
tb.logger = logger

return &tb
}
Expand All @@ -88,11 +91,19 @@ func (tb *TelemetryBuffer) StartServer() error {
err := tb.Listen(FdName)
if err != nil {
tb.FdExists = strings.Contains(err.Error(), "in use") || strings.Contains(err.Error(), "Access is denied")
log.Logf("Listen returns: %v", err.Error())
if tb.logger != nil {
tb.logger.Error("Listen returns", zap.Error(err))
} else {
log.Logf("Listen returns: %v", err.Error())
}
return err
}

log.Logf("Telemetry service started")
if tb.logger != nil {
tb.logger.Info("Telemetry service started")
} else {
log.Logf("Telemetry service started")
}
// Spawn server goroutine to handle incoming connections
go func() {
for {
Expand All @@ -109,7 +120,11 @@ func (tb *TelemetryBuffer) StartServer() error {
var tmp map[string]interface{}
err = json.Unmarshal(reportStr, &tmp)
if err != nil {
log.Logf("StartServer: unmarshal error:%v", err)
if tb.logger != nil {
tb.logger.Error("StartServer: unmarshal error", zap.Error(err))
} else {
log.Logf("StartServer: unmarshal error:%v", err)
}
return
}
if _, ok := tmp["CniSucceeded"]; ok {
Expand All @@ -121,7 +136,11 @@ func (tb *TelemetryBuffer) StartServer() error {
json.Unmarshal([]byte(reportStr), &aiMetric)
tb.data <- aiMetric
} else {
log.Logf("StartServer: default case:%+v...", tmp)
if tb.logger != nil {
tb.logger.Info("StartServer: default", zap.Any("case", tmp))
} else {
log.Logf("StartServer: default case:%+v...", tmp)
}
}
} else {
var index int
Expand All @@ -148,7 +167,11 @@ func (tb *TelemetryBuffer) StartServer() error {
}
}()
} else {
log.Logf("Telemetry Server accept error %v", err)
if tb.logger != nil {
tb.logger.Error("Telemetry Server accept error", zap.Error(err))
} else {
log.Logf("Telemetry Server accept error %v", err)
}
return
}
}
Expand Down Expand Up @@ -179,10 +202,18 @@ func (tb *TelemetryBuffer) PushData(ctx context.Context) {
push(report)
tb.mutex.Unlock()
case <-tb.cancel:
log.Logf("[Telemetry] server cancel event")
if tb.logger != nil {
tb.logger.Info("server cancel event")
} else {
log.Logf("[Telemetry] server cancel event")
}
return
case <-ctx.Done():
log.Logf("[Telemetry] received context done event")
if tb.logger != nil {
tb.logger.Info("received context done event")
} else {
log.Logf("[Telemetry] received context done event")
}
return
}
}
Expand Down
15 changes: 9 additions & 6 deletions telemetry/telemetrybuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"testing"
"time"

"github.com/Azure/azure-container-networking/cni/log"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

const telemetryConfig = "azure-vnet-telemetry.config"

func createTBServer(t *testing.T) (*TelemetryBuffer, func()) {
tbServer := NewTelemetryBuffer()
tbServer := NewTelemetryBuffer(nil)
err := tbServer.StartServer()
require.NoError(t, err)

Expand All @@ -25,7 +27,7 @@ func TestStartServer(t *testing.T) {
_, closeTBServer := createTBServer(t)
defer closeTBServer()

secondTBServer := NewTelemetryBuffer()
secondTBServer := NewTelemetryBuffer(nil)
err := secondTBServer.StartServer()
require.Error(t, err)
}
Expand All @@ -34,7 +36,8 @@ func TestConnect(t *testing.T) {
_, closeTBServer := createTBServer(t)
defer closeTBServer()

tbClient := NewTelemetryBuffer()
logger := log.TelemetryLogger.With(zap.String("component", "cni-telemetry"))
tbClient := NewTelemetryBuffer(logger)
err := tbClient.Connect()
require.NoError(t, err)

Expand All @@ -45,7 +48,7 @@ func TestServerConnClose(t *testing.T) {
tbServer, closeTBServer := createTBServer(t)
defer closeTBServer()

tbClient := NewTelemetryBuffer()
tbClient := NewTelemetryBuffer(nil)
err := tbClient.Connect()
require.NoError(t, err)
defer tbClient.Close()
Expand All @@ -61,7 +64,7 @@ func TestClientConnClose(t *testing.T) {
_, closeTBServer := createTBServer(t)
defer closeTBServer()

tbClient := NewTelemetryBuffer()
tbClient := NewTelemetryBuffer(nil)
err := tbClient.Connect()
require.NoError(t, err)
tbClient.Close()
Expand All @@ -71,7 +74,7 @@ func TestWrite(t *testing.T) {
_, closeTBServer := createTBServer(t)
defer closeTBServer()

tbClient := NewTelemetryBuffer()
tbClient := NewTelemetryBuffer(nil)
err := tbClient.Connect()
require.NoError(t, err)
defer tbClient.Close()
Expand Down

0 comments on commit ae0c08c

Please sign in to comment.