Skip to content

Commit

Permalink
add handler label to sinker from pktvisor JSON (#1776)
Browse files Browse the repository at this point in the history
* first pass at adding handler label to sinker

* fix metric collection

* fix some units

* test(sinker): update unit tests with handler label

* test(sinker): fix DNSPayloadRatesTotal unit test

Co-authored-by: mclcavalcante <mariana.cavalcante@encora.com>
  • Loading branch information
weyrick and mclcavalcante authored Sep 16, 2022
1 parent 3065954 commit b083020
Show file tree
Hide file tree
Showing 2 changed files with 1,108 additions and 229 deletions.
80 changes: 50 additions & 30 deletions sinker/backend/pktvisor/pktvisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ type pktvisorBackend struct {
}

type context struct {
agent *pb.AgentInfoRes
agentID string
policyID string
policyName string
deviceID string
tags map[string]string
logger *zap.Logger
agent *pb.AgentInfoRes
agentID string
policyID string
policyName string
deviceID string
handlerLabel string
tags map[string]string
logger *zap.Logger
}

func (p pktvisorBackend) ProcessMetrics(agent *pb.AgentInfoRes, agentID string, data fleet.AgentMetricsRPCPayload) ([]prometheus.TimeSeries, error) {
Expand All @@ -59,61 +60,76 @@ func (p pktvisorBackend) ProcessMetrics(agent *pb.AgentInfoRes, agentID string,
}

context := context{
agent: agent,
agentID: agentID,
policyID: data.PolicyID,
policyName: data.PolicyName,
deviceID: "",
tags: tags,
logger: p.logger,
agent: agent,
agentID: agentID,
policyID: data.PolicyID,
policyName: data.PolicyName,
deviceID: "",
handlerLabel: "",
tags: tags,
logger: p.logger,
}
stats := StatSnapshot{}
for _, handlerData := range metrics {
stats := make(map[string]StatSnapshot)
for handlerLabel, handlerData := range metrics {
if data, ok := handlerData["pcap"]; ok {
err := mapstructure.Decode(data, &stats.Pcap)
sTmp := StatSnapshot{}
err := mapstructure.Decode(data, &sTmp.Pcap)
if err != nil {
p.logger.Error("error decoding pcap handler", zap.Error(err))
continue
}
stats[handlerLabel] = sTmp
} else if data, ok := handlerData["dns"]; ok {
err := mapstructure.Decode(data, &stats.DNS)
sTmp := StatSnapshot{}
err := mapstructure.Decode(data, &sTmp.DNS)
if err != nil {
p.logger.Error("error decoding dns handler", zap.Error(err))
continue
}
stats[handlerLabel] = sTmp
} else if data, ok := handlerData["packets"]; ok {
err := mapstructure.Decode(data, &stats.Packets)
sTmp := StatSnapshot{}
err := mapstructure.Decode(data, &sTmp.Packets)
if err != nil {
p.logger.Error("error decoding packets handler", zap.Error(err))
continue
}
stats[handlerLabel] = sTmp
} else if data, ok := handlerData["dhcp"]; ok {
err := mapstructure.Decode(data, &stats.DHCP)
sTmp := StatSnapshot{}
err := mapstructure.Decode(data, &sTmp.DHCP)
if err != nil {
p.logger.Error("error decoding dhcp handler", zap.Error(err))
continue
}
stats[handlerLabel] = sTmp
} else if data, ok := handlerData["flow"]; ok {
err := mapstructure.Decode(data, &stats.Flow)
sTmp := StatSnapshot{}
err := mapstructure.Decode(data, &sTmp.Flow)
if err != nil {
p.logger.Error("error decoding dhcp handler", zap.Error(err))
continue
}
stats[handlerLabel] = sTmp
}
}
return parseToProm(&context, stats), nil
}

func parseToProm(ctxt *context, stats StatSnapshot) prometheus.TSList {
var tsList = prometheus.TSList{}

statsMap := structs.Map(stats)
if stats.Flow != nil {
convertFlowToPromParticle(ctxt, statsMap, "", &tsList)
return tsList
func parseToProm(ctxt *context, statsMap map[string]StatSnapshot) prometheus.TSList {
var finalTs = prometheus.TSList{}
for handlerLabel, stats := range statsMap {
var tsList = prometheus.TSList{}
statsMap := structs.Map(stats)
ctxt.handlerLabel = handlerLabel
if stats.Flow != nil {
convertFlowToPromParticle(ctxt, statsMap, "", &tsList)
} else {
convertToPromParticle(ctxt, statsMap, "", &tsList)
}
finalTs = append(finalTs, tsList...)
}
convertToPromParticle(ctxt, statsMap, "", &tsList)
return tsList
return finalTs
}

func convertToPromParticle(ctxt *context, statsMap map[string]interface{}, label string, tsList *prometheus.TSList) {
Expand Down Expand Up @@ -273,6 +289,10 @@ func makePromParticle(ctxt *context, label string, k string, v interface{}, tsLi
handleParticleError(ctxt, err)
return tsList
}
if err := labelsListFlag.Set("handler;" + ctxt.handlerLabel); err != nil {
handleParticleError(ctxt, err)
return tsList
}
if ctxt.deviceID != "" {
if err := labelsListFlag.Set("device;" + ctxt.deviceID); err != nil {
handleParticleError(ctxt, err)
Expand Down
Loading

0 comments on commit b083020

Please sign in to comment.