diff --git a/sinker/backend/pktvisor/pktvisor.go b/sinker/backend/pktvisor/pktvisor.go index bff9da513..8faaefc5a 100644 --- a/sinker/backend/pktvisor/pktvisor.go +++ b/sinker/backend/pktvisor/pktvisor.go @@ -27,13 +27,14 @@ type pktvisorBackend struct { } type context struct { - agent *pb.AgentInfoRes - agentID string - policyID string - policyName string - deviceID string - tags map[string]string - logger *zap.Logger + agent *pb.AgentInfoRes + agentID string + policyID string + policyName string + deviceID string + handlerLabel string + tags map[string]string + logger *zap.Logger } func (p pktvisorBackend) ProcessMetrics(agent *pb.AgentInfoRes, agentID string, data fleet.AgentMetricsRPCPayload) ([]prometheus.TimeSeries, error) { @@ -59,61 +60,76 @@ func (p pktvisorBackend) ProcessMetrics(agent *pb.AgentInfoRes, agentID string, } context := context{ - agent: agent, - agentID: agentID, - policyID: data.PolicyID, - policyName: data.PolicyName, - deviceID: "", - tags: tags, - logger: p.logger, + agent: agent, + agentID: agentID, + policyID: data.PolicyID, + policyName: data.PolicyName, + deviceID: "", + handlerLabel: "", + tags: tags, + logger: p.logger, } - stats := StatSnapshot{} - for _, handlerData := range metrics { + stats := make(map[string]StatSnapshot) + for handlerLabel, handlerData := range metrics { if data, ok := handlerData["pcap"]; ok { - err := mapstructure.Decode(data, &stats.Pcap) + sTmp := StatSnapshot{} + err := mapstructure.Decode(data, &sTmp.Pcap) if err != nil { p.logger.Error("error decoding pcap handler", zap.Error(err)) continue } + stats[handlerLabel] = sTmp } else if data, ok := handlerData["dns"]; ok { - err := mapstructure.Decode(data, &stats.DNS) + sTmp := StatSnapshot{} + err := mapstructure.Decode(data, &sTmp.DNS) if err != nil { p.logger.Error("error decoding dns handler", zap.Error(err)) continue } + stats[handlerLabel] = sTmp } else if data, ok := handlerData["packets"]; ok { - err := mapstructure.Decode(data, &stats.Packets) + sTmp := StatSnapshot{} + err := mapstructure.Decode(data, &sTmp.Packets) if err != nil { p.logger.Error("error decoding packets handler", zap.Error(err)) continue } + stats[handlerLabel] = sTmp } else if data, ok := handlerData["dhcp"]; ok { - err := mapstructure.Decode(data, &stats.DHCP) + sTmp := StatSnapshot{} + err := mapstructure.Decode(data, &sTmp.DHCP) if err != nil { p.logger.Error("error decoding dhcp handler", zap.Error(err)) continue } + stats[handlerLabel] = sTmp } else if data, ok := handlerData["flow"]; ok { - err := mapstructure.Decode(data, &stats.Flow) + sTmp := StatSnapshot{} + err := mapstructure.Decode(data, &sTmp.Flow) if err != nil { p.logger.Error("error decoding dhcp handler", zap.Error(err)) continue } + stats[handlerLabel] = sTmp } } return parseToProm(&context, stats), nil } -func parseToProm(ctxt *context, stats StatSnapshot) prometheus.TSList { - var tsList = prometheus.TSList{} - - statsMap := structs.Map(stats) - if stats.Flow != nil { - convertFlowToPromParticle(ctxt, statsMap, "", &tsList) - return tsList +func parseToProm(ctxt *context, statsMap map[string]StatSnapshot) prometheus.TSList { + var finalTs = prometheus.TSList{} + for handlerLabel, stats := range statsMap { + var tsList = prometheus.TSList{} + statsMap := structs.Map(stats) + ctxt.handlerLabel = handlerLabel + if stats.Flow != nil { + convertFlowToPromParticle(ctxt, statsMap, "", &tsList) + } else { + convertToPromParticle(ctxt, statsMap, "", &tsList) + } + finalTs = append(finalTs, tsList...) } - convertToPromParticle(ctxt, statsMap, "", &tsList) - return tsList + return finalTs } func convertToPromParticle(ctxt *context, statsMap map[string]interface{}, label string, tsList *prometheus.TSList) { @@ -273,6 +289,10 @@ func makePromParticle(ctxt *context, label string, k string, v interface{}, tsLi handleParticleError(ctxt, err) return tsList } + if err := labelsListFlag.Set("handler;" + ctxt.handlerLabel); err != nil { + handleParticleError(ctxt, err) + return tsList + } if ctxt.deviceID != "" { if err := labelsListFlag.Set("device;" + ctxt.deviceID); err != nil { handleParticleError(ctxt, err) diff --git a/sinker/backend/pktvisor/pktvisor_test.go b/sinker/backend/pktvisor/pktvisor_test.go index 90afcadcd..990b675de 100644 --- a/sinker/backend/pktvisor/pktvisor_test.go +++ b/sinker/backend/pktvisor/pktvisor_test.go @@ -66,6 +66,10 @@ func TestDHCPConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_dhcp", + }, } cases := map[string]struct { @@ -306,6 +310,10 @@ func TestASNConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_packets", + }, { Name: "asn", Value: "36236/NETACTUATE", @@ -410,6 +418,10 @@ func TestGeoLocConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_packets", + }, { Name: "geo_loc", Value: "AS/Hong Kong/HCW/Central", @@ -491,6 +503,10 @@ func TestPCAPConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_pcap", + }, } cases := map[string]struct { @@ -625,6 +641,10 @@ func TestDNSConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_dns", + }, } cases := map[string]struct { @@ -733,32 +753,667 @@ func TestDNSConversion(t *testing.T) { }, }, }, - "DNSPayloadTopNodata": { + "DNSPayloadTopNodata": { + data: []byte(` +{ + "policy_dns": { + "dns": { + "top_nodata": [ + { + "estimate": 186, + "name": "89.187.189.231" + } + ] + } + } +}`), + expected: prometheus.TimeSeries{ + Labels: append(prependLabel(commonLabels, prometheus.Label{ + Name: "__name__", + Value: "dns_top_nodata", + }), prometheus.Label{ + Name: "qname", + Value: "89.187.189.231", + }), + Datapoint: prometheus.Datapoint{ + Value: 186, + }, + }, + }, + } + + for desc, c := range cases { + t.Run(desc, func(t *testing.T) { + data.Data = c.data + res, err := be.ProcessMetrics(agent, agentID.String(), data) + require.Nil(t, err, fmt.Sprintf("%s: unexpected error: %s", desc, err)) + var receivedLabel []prometheus.Label + var receivedDatapoint prometheus.Datapoint + for _, value := range res { + if c.expected.Labels[0] == value.Labels[0] { + if len(c.expected.Labels) < 7 { + receivedLabel = value.Labels + receivedDatapoint = value.Datapoint + } else { + if c.expected.Labels[6].Value == value.Labels[6].Value { + receivedLabel = value.Labels + receivedDatapoint = value.Datapoint + } + } + } + } + assert.True(t, reflect.DeepEqual(c.expected.Labels, receivedLabel), fmt.Sprintf("%s: expected %v got %v", desc, c.expected.Labels, receivedLabel)) + assert.Equal(t, c.expected.Datapoint.Value, receivedDatapoint.Value, fmt.Sprintf("%s: expected value %f got %f", desc, c.expected.Datapoint.Value, receivedDatapoint.Value)) + }) + } + +} + +func TestDNSRatesConversion(t *testing.T) { + var logger = zap.NewNop() + pktvisor.Register(logger) + + ownerID, err := uuid.NewV4() + require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) + + policyID, err := uuid.NewV4() + require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) + + agentID, err := uuid.NewV4() + require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) + + var agent = &pb.AgentInfoRes{ + OwnerID: ownerID.String(), + AgentName: "agent-test", + } + + data := fleet.AgentMetricsRPCPayload{ + PolicyID: policyID.String(), + PolicyName: "policy-test", + Datasets: nil, + Format: "json", + BEVersion: "1.0", + } + + be := backend.GetBackend("pktvisor") + + commonLabels := []prometheus.Label{ + { + Name: "instance", + Value: "agent-test", + }, + { + Name: "agent_id", + Value: agentID.String(), + }, + { + Name: "agent", + Value: "agent-test", + }, + { + Name: "policy_id", + Value: policyID.String(), + }, + { + Name: "policy", + Value: "policy-test", + }, + { + Name: "handler", + Value: "policy_dns", + }, + { + Name: "quantile", + Value: "0.5", + }, + { + Name: "instance", + Value: "agent-test", + }, + { + Name: "agent_id", + Value: agentID.String(), + }, + { + Name: "agent", + Value: "agent-test", + }, + { + Name: "policy_id", + Value: policyID.String(), + }, + { + Name: "policy", + Value: "policy-test", + }, + { + Name: "handler", + Value: "policy_dns", + }, + { + Name: "quantile", + Value: "0.9", + }, + { + Name: "instance", + Value: "agent-test", + }, + { + Name: "agent_id", + Value: agentID.String(), + }, + { + Name: "agent", + Value: "agent-test", + }, + { + Name: "policy_id", + Value: policyID.String(), + }, + { + Name: "policy", + Value: "policy-test", + }, + { + Name: "handler", + Value: "policy_dns", + }, + { + Name: "quantile", + Value: "0.95", + }, + { + Name: "instance", + Value: "agent-test", + }, + { + Name: "agent_id", + Value: agentID.String(), + }, + { + Name: "agent", + Value: "agent-test", + }, + { + Name: "policy_id", + Value: policyID.String(), + }, + { + Name: "policy", + Value: "policy-test", + }, + { + Name: "handler", + Value: "policy_dns", + }, + { + Name: "quantile", + Value: "0.99", + }, + } + + cases := map[string]struct { + data []byte + expectedLabels []prometheus.Label + expectedDatapoints []float64 + }{ + "DNSPayloadRatesTotal": { + data: []byte(` +{ + "policy_dns": { + "dns": { + "rates": { + "total": { + "p50": 0, + "p90": 1, + "p95": 2, + "p99": 6 + } + } + } + } +}`), + expectedLabels: labelQuantiles(commonLabels, prometheus.Label{ + Name: "__name__", + Value: "dns_rates_total", + }), + expectedDatapoints: []float64{0, 1, 2, 6}, + }, + } + + for desc, c := range cases { + t.Run(desc, func(t *testing.T) { + data.Data = c.data + res, err := be.ProcessMetrics(agent, agentID.String(), data) + require.Nil(t, err, fmt.Sprintf("%s: unexpected error: %s", desc, err)) + var receivedLabel []prometheus.Label + var receivedDatapoint []float64 + + for _, value := range res { + if c.expectedLabels[0] == value.Labels[0] { + for _, labels := range value.Labels { + receivedLabel = append(receivedLabel, labels) + } + receivedDatapoint = append(receivedDatapoint, value.Datapoint.Value) + } + } + + assert.ElementsMatch(t, c.expectedLabels, receivedLabel, fmt.Sprintf("%s: expected %v got %v", desc, c.expectedLabels, receivedLabel)) + assert.ElementsMatch(t, c.expectedDatapoints, receivedDatapoint, fmt.Sprintf("%s: expected %v got %v", desc, c.expectedDatapoints, receivedDatapoint)) + }) + } + +} + +func TestDHCPRatesConversion(t *testing.T) { + var logger = zap.NewNop() + pktvisor.Register(logger) + + ownerID, err := uuid.NewV4() + require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) + + policyID, err := uuid.NewV4() + require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) + + agentID, err := uuid.NewV4() + require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) + + var agent = &pb.AgentInfoRes{ + OwnerID: ownerID.String(), + AgentName: "agent-test", + } + + data := fleet.AgentMetricsRPCPayload{ + PolicyID: policyID.String(), + PolicyName: "policy-test", + Datasets: nil, + Format: "json", + BEVersion: "1.0", + } + + be := backend.GetBackend("pktvisor") + + commonLabels := []prometheus.Label{ + { + Name: "instance", + Value: "agent-test", + }, + { + Name: "agent_id", + Value: agentID.String(), + }, + { + Name: "agent", + Value: "agent-test", + }, + { + Name: "policy_id", + Value: policyID.String(), + }, + { + Name: "policy", + Value: "policy-test", + }, + { + Name: "handler", + Value: "policy_dhcp", + }, + { + Name: "quantile", + Value: "0.5", + }, + { + Name: "instance", + Value: "agent-test", + }, + { + Name: "agent_id", + Value: agentID.String(), + }, + { + Name: "agent", + Value: "agent-test", + }, + { + Name: "policy_id", + Value: policyID.String(), + }, + { + Name: "policy", + Value: "policy-test", + }, + { + Name: "handler", + Value: "policy_dhcp", + }, + { + Name: "quantile", + Value: "0.9", + }, + { + Name: "instance", + Value: "agent-test", + }, + { + Name: "agent_id", + Value: agentID.String(), + }, + { + Name: "agent", + Value: "agent-test", + }, + { + Name: "policy_id", + Value: policyID.String(), + }, + { + Name: "policy", + Value: "policy-test", + }, + { + Name: "handler", + Value: "policy_dhcp", + }, + { + Name: "quantile", + Value: "0.95", + }, + { + Name: "instance", + Value: "agent-test", + }, + { + Name: "agent_id", + Value: agentID.String(), + }, + { + Name: "agent", + Value: "agent-test", + }, + { + Name: "policy_id", + Value: policyID.String(), + }, + { + Name: "policy", + Value: "policy-test", + }, + { + Name: "handler", + Value: "policy_dhcp", + }, + { + Name: "quantile", + Value: "0.99", + }, + } + + cases := map[string]struct { + data []byte + expectedLabels []prometheus.Label + expectedDatapoints []float64 + }{ + "DHCPPayloadRates": { + data: []byte(` +{ + "policy_dhcp": { + "dhcp": { + "rates": { + "total": { + "p50": 0, + "p90": 1, + "p95": 2, + "p99": 6 + } + } + } + } +}`), + expectedLabels: labelQuantiles(commonLabels, prometheus.Label{ + Name: "__name__", + Value: "dhcp_rates_total", + }), + expectedDatapoints: []float64{0, 1, 2, 6}, + }, + } + + for desc, c := range cases { + t.Run(desc, func(t *testing.T) { + data.Data = c.data + res, err := be.ProcessMetrics(agent, agentID.String(), data) + require.Nil(t, err, fmt.Sprintf("%s: unexpected error: %s", desc, err)) + var receivedLabel []prometheus.Label + var receivedDatapoint []float64 + + for _, value := range res { + if c.expectedLabels[0] == value.Labels[0] { + for _, labels := range value.Labels { + receivedLabel = append(receivedLabel, labels) + } + receivedDatapoint = append(receivedDatapoint, value.Datapoint.Value) + } + } + + assert.ElementsMatch(t, c.expectedLabels, receivedLabel, fmt.Sprintf("%s: expected %v got %v", desc, c.expectedLabels, receivedLabel)) + assert.ElementsMatch(t, c.expectedDatapoints, receivedDatapoint, fmt.Sprintf("%s: expected %v got %v", desc, c.expectedDatapoints, receivedDatapoint)) + }) + } + +} + +func TestPacketsRatesConversion(t *testing.T) { + var logger = zap.NewNop() + pktvisor.Register(logger) + + ownerID, err := uuid.NewV4() + require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) + + policyID, err := uuid.NewV4() + require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) + + agentID, err := uuid.NewV4() + require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) + + var agent = &pb.AgentInfoRes{ + OwnerID: ownerID.String(), + AgentName: "agent-test", + } + + data := fleet.AgentMetricsRPCPayload{ + PolicyID: policyID.String(), + PolicyName: "policy-test", + Datasets: nil, + Format: "json", + BEVersion: "1.0", + } + + be := backend.GetBackend("pktvisor") + + commonLabels := []prometheus.Label{ + { + Name: "instance", + Value: "agent-test", + }, + { + Name: "agent_id", + Value: agentID.String(), + }, + { + Name: "agent", + Value: "agent-test", + }, + { + Name: "policy_id", + Value: policyID.String(), + }, + { + Name: "policy", + Value: "policy-test", + }, + { + Name: "handler", + Value: "policy_dns", + }, + { + Name: "quantile", + Value: "0.5", + }, + { + Name: "instance", + Value: "agent-test", + }, + { + Name: "agent_id", + Value: agentID.String(), + }, + { + Name: "agent", + Value: "agent-test", + }, + { + Name: "policy_id", + Value: policyID.String(), + }, + { + Name: "policy", + Value: "policy-test", + }, + { + Name: "handler", + Value: "policy_dns", + }, + { + Name: "quantile", + Value: "0.9", + }, + { + Name: "instance", + Value: "agent-test", + }, + { + Name: "agent_id", + Value: agentID.String(), + }, + { + Name: "agent", + Value: "agent-test", + }, + { + Name: "policy_id", + Value: policyID.String(), + }, + { + Name: "policy", + Value: "policy-test", + }, + { + Name: "handler", + Value: "policy_dns", + }, + { + Name: "quantile", + Value: "0.95", + }, + { + Name: "instance", + Value: "agent-test", + }, + { + Name: "agent_id", + Value: agentID.String(), + }, + { + Name: "agent", + Value: "agent-test", + }, + { + Name: "policy_id", + Value: policyID.String(), + }, + { + Name: "policy", + Value: "policy-test", + }, + { + Name: "handler", + Value: "policy_dns", + }, + { + Name: "quantile", + Value: "0.99", + }, + } + + cases := map[string]struct { + data []byte + expectedLabels []prometheus.Label + expectedDatapoints []float64 + }{ + "PacketsPayloadRatesPpsIn": { + data: []byte(` +{ + "policy_dns": { + "packets": { + "rates": { + "pps_in": { + "p50": 0, + "p90": 1, + "p95": 2, + "p99": 6 + } + } + } + } +}`), + expectedLabels: labelQuantiles(commonLabels, prometheus.Label{ + Name: "__name__", + Value: "packets_rates_pps_in", + }), + expectedDatapoints: []float64{0, 1, 2, 6}, + }, + "PacketsPayloadRatesPpsTotal": { + data: []byte(` +{ + "policy_dns": { + "packets": { + "rates": { + "pps_total": { + "p50": 0, + "p90": 1, + "p95": 2, + "p99": 6 + } + } + } + } +}`), + expectedLabels: labelQuantiles(commonLabels, prometheus.Label{ + Name: "__name__", + Value: "packets_rates_pps_total", + }), + expectedDatapoints: []float64{0, 1, 2, 6}, + }, + "PacketsPayloadRatesPpsOut": { data: []byte(` { "policy_dns": { - "dns": { - "top_nodata": [ - { - "estimate": 186, - "name": "89.187.189.231" - } - ] + "packets": { + "rates": { + "pps_out": { + "p50": 0, + "p90": 1, + "p95": 2, + "p99": 6 + } + } } } }`), - expected: prometheus.TimeSeries{ - Labels: append(prependLabel(commonLabels, prometheus.Label{ - Name: "__name__", - Value: "dns_top_nodata", - }), prometheus.Label{ - Name: "qname", - Value: "89.187.189.231", - }), - Datapoint: prometheus.Datapoint{ - Value: 186, - }, - }, + expectedLabels: labelQuantiles(commonLabels, prometheus.Label{ + Name: "__name__", + Value: "packets_rates_pps_out", + }), + expectedDatapoints: []float64{0, 1, 2, 6}, }, } @@ -768,28 +1423,25 @@ func TestDNSConversion(t *testing.T) { res, err := be.ProcessMetrics(agent, agentID.String(), data) require.Nil(t, err, fmt.Sprintf("%s: unexpected error: %s", desc, err)) var receivedLabel []prometheus.Label - var receivedDatapoint prometheus.Datapoint + var receivedDatapoint []float64 + for _, value := range res { - if c.expected.Labels[0] == value.Labels[0] { - if len(c.expected.Labels) < 7 { - receivedLabel = value.Labels - receivedDatapoint = value.Datapoint - } else { - if c.expected.Labels[6].Value == value.Labels[6].Value { - receivedLabel = value.Labels - receivedDatapoint = value.Datapoint - } + if c.expectedLabels[0] == value.Labels[0] { + for _, labels := range value.Labels { + receivedLabel = append(receivedLabel, labels) } + receivedDatapoint = append(receivedDatapoint, value.Datapoint.Value) } } - assert.True(t, reflect.DeepEqual(c.expected.Labels, receivedLabel), fmt.Sprintf("%s: expected %v got %v", desc, c.expected.Labels, receivedLabel)) - assert.Equal(t, c.expected.Datapoint.Value, receivedDatapoint.Value, fmt.Sprintf("%s: expected value %f got %f", desc, c.expected.Datapoint.Value, receivedDatapoint.Value)) + + assert.ElementsMatch(t, c.expectedLabels, receivedLabel, fmt.Sprintf("%s: expected %v got %v", desc, c.expectedLabels, receivedLabel)) + assert.ElementsMatch(t, c.expectedDatapoints, receivedDatapoint, fmt.Sprintf("%s: expected %v got %v", desc, c.expectedDatapoints, receivedDatapoint)) }) } } -func TestDNSRatesConversion(t *testing.T) { +func TestFlowRatesConversion(t *testing.T) { var logger = zap.NewNop() pktvisor.Register(logger) @@ -838,6 +1490,10 @@ func TestDNSRatesConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, { Name: "quantile", Value: "0.5", @@ -862,6 +1518,10 @@ func TestDNSRatesConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, { Name: "quantile", Value: "0.9", @@ -886,6 +1546,10 @@ func TestDNSRatesConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, { Name: "quantile", Value: "0.95", @@ -910,6 +1574,10 @@ func TestDNSRatesConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, { Name: "quantile", Value: "0.99", @@ -921,116 +1589,6 @@ func TestDNSRatesConversion(t *testing.T) { expectedLabels []prometheus.Label expectedDatapoints []float64 }{ - "DNSPayloadRatesTotal": { - data: []byte(` -{ - "policy_dns": { - "dns": { - "rates": { - "total": { - "p50": 0, - "p90": 1, - "p95": 2, - "p99": 6 - } - } - } - } -}`), - expectedLabels: labelQuantiles(commonLabels, prometheus.Label{ - Name: "__name__", - Value: "dns_rates_total", - }), - expectedDatapoints: []float64{0, 1, 2, 6}, - }, - "PacketsPayloadRatesPpsIn": { - data: []byte(` -{ - "policy_dns": { - "packets": { - "rates": { - "pps_in": { - "p50": 0, - "p90": 1, - "p95": 2, - "p99": 6 - } - } - } - } -}`), - expectedLabels: labelQuantiles(commonLabels, prometheus.Label{ - Name: "__name__", - Value: "packets_rates_pps_in", - }), - expectedDatapoints: []float64{0, 1, 2, 6}, - }, - "PacketsPayloadRatesPpsTotal": { - data: []byte(` -{ - "policy_dns": { - "packets": { - "rates": { - "pps_total": { - "p50": 0, - "p90": 1, - "p95": 2, - "p99": 6 - } - } - } - } -}`), - expectedLabels: labelQuantiles(commonLabels, prometheus.Label{ - Name: "__name__", - Value: "packets_rates_pps_total", - }), - expectedDatapoints: []float64{0, 1, 2, 6}, - }, - "PacketsPayloadRatesPpsOut": { - data: []byte(` -{ - "policy_dns": { - "packets": { - "rates": { - "pps_out": { - "p50": 0, - "p90": 1, - "p95": 2, - "p99": 6 - } - } - } - } -}`), - expectedLabels: labelQuantiles(commonLabels, prometheus.Label{ - Name: "__name__", - Value: "packets_rates_pps_out", - }), - expectedDatapoints: []float64{0, 1, 2, 6}, - }, - "DHCPPayloadRates": { - data: []byte(` -{ - "policy_dhcp": { - "dhcp": { - "rates": { - "total": { - "p50": 0, - "p90": 1, - "p95": 2, - "p99": 6 - } - } - } - } -}`), - expectedLabels: labelQuantiles(commonLabels, prometheus.Label{ - Name: "__name__", - Value: "dhcp_rates_total", - }), - expectedDatapoints: []float64{0, 1, 2, 6}, - }, "FlowPayloadRatesBytes": { data: []byte(` { @@ -1200,6 +1758,10 @@ func TestDNSTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_dns", + }, { Name: "qname", Value: ".google.com", @@ -1250,6 +1812,10 @@ func TestDNSTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_dns", + }, { Name: "qname", Value: ".l.google.com", @@ -1300,6 +1866,10 @@ func TestDNSTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_dns", + }, { Name: "ecs", Value: "2001:470:1f0b:1600::", @@ -1350,6 +1920,10 @@ func TestDNSTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_dns", + }, { Name: "qtype", Value: "HTTPS", @@ -1400,6 +1974,10 @@ func TestDNSTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_dns", + }, { Name: "port", Value: "39783", @@ -1450,6 +2028,10 @@ func TestDNSTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_dns", + }, { Name: "rcode", Value: "NOERROR", @@ -1531,6 +2113,10 @@ func TestDNSWirePacketsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_dns", + }, } cases := map[string]struct { @@ -1881,6 +2467,10 @@ func TestDNSXactConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_dns", + }, } cases := map[string]struct { @@ -2111,6 +2701,10 @@ func TestPacketsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_dns", + }, } cases := map[string]struct { @@ -2452,29 +3046,6 @@ func TestPeriodConversion(t *testing.T) { be := backend.GetBackend("pktvisor") - commonLabels := []prometheus.Label{ - { - Name: "instance", - Value: "agent-test", - }, - { - Name: "agent_id", - Value: agentID.String(), - }, - { - Name: "agent", - Value: "agent-test", - }, - { - Name: "policy_id", - Value: policyID.String(), - }, - { - Name: "policy", - Value: "policy-test", - }, - } - cases := map[string]struct { data []byte expectedLength prometheus.TimeSeries @@ -2493,19 +3064,71 @@ func TestPeriodConversion(t *testing.T) { } }`), expectedLength: prometheus.TimeSeries{ - Labels: append(prependLabel(commonLabels, prometheus.Label{ - Name: "__name__", - Value: "dns_period_length", - })), + Labels: []prometheus.Label{ + { + Name: "__name__", + Value: "dns_period_length", + }, + { + Name: "instance", + Value: "agent-test", + }, + { + Name: "agent_id", + Value: agentID.String(), + }, + { + Name: "agent", + Value: "agent-test", + }, + { + Name: "policy_id", + Value: policyID.String(), + }, + { + Name: "policy", + Value: "policy-test", + }, + { + Name: "handler", + Value: "policy_dns", + }, + }, Datapoint: prometheus.Datapoint{ Value: 60, }, }, expectedStartTs: prometheus.TimeSeries{ - Labels: append(prependLabel(commonLabels, prometheus.Label{ - Name: "__name__", - Value: "dns_period_start_ts", - })), + Labels: []prometheus.Label{ + { + Name: "__name__", + Value: "dns_period_start_ts", + }, + { + Name: "instance", + Value: "agent-test", + }, + { + Name: "agent_id", + Value: agentID.String(), + }, + { + Name: "agent", + Value: "agent-test", + }, + { + Name: "policy_id", + Value: policyID.String(), + }, + { + Name: "policy", + Value: "policy-test", + }, + { + Name: "handler", + Value: "policy_dns", + }, + }, Datapoint: prometheus.Datapoint{ Value: 1624888107, }, @@ -2524,19 +3147,71 @@ func TestPeriodConversion(t *testing.T) { } }`), expectedLength: prometheus.TimeSeries{ - Labels: append(prependLabel(commonLabels, prometheus.Label{ - Name: "__name__", - Value: "packets_period_length", - })), + Labels: []prometheus.Label{ + { + Name: "__name__", + Value: "packets_period_length", + }, + { + Name: "instance", + Value: "agent-test", + }, + { + Name: "agent_id", + Value: agentID.String(), + }, + { + Name: "agent", + Value: "agent-test", + }, + { + Name: "policy_id", + Value: policyID.String(), + }, + { + Name: "policy", + Value: "policy-test", + }, + { + Name: "handler", + Value: "policy_packets", + }, + }, Datapoint: prometheus.Datapoint{ Value: 60, }, }, expectedStartTs: prometheus.TimeSeries{ - Labels: append(prependLabel(commonLabels, prometheus.Label{ - Name: "__name__", - Value: "packets_period_start_ts", - })), + Labels: []prometheus.Label{ + { + Name: "__name__", + Value: "packets_period_start_ts", + }, + { + Name: "instance", + Value: "agent-test", + }, + { + Name: "agent_id", + Value: agentID.String(), + }, + { + Name: "agent", + Value: "agent-test", + }, + { + Name: "policy_id", + Value: policyID.String(), + }, + { + Name: "policy", + Value: "policy-test", + }, + { + Name: "handler", + Value: "policy_packets", + }, + }, Datapoint: prometheus.Datapoint{ Value: 1624888107, }, @@ -2555,19 +3230,71 @@ func TestPeriodConversion(t *testing.T) { } }`), expectedLength: prometheus.TimeSeries{ - Labels: append(prependLabel(commonLabels, prometheus.Label{ - Name: "__name__", - Value: "dhcp_period_length", - })), + Labels: []prometheus.Label{ + { + Name: "__name__", + Value: "dhcp_period_length", + }, + { + Name: "instance", + Value: "agent-test", + }, + { + Name: "agent_id", + Value: agentID.String(), + }, + { + Name: "agent", + Value: "agent-test", + }, + { + Name: "policy_id", + Value: policyID.String(), + }, + { + Name: "policy", + Value: "policy-test", + }, + { + Name: "handler", + Value: "policy_dhcp", + }, + }, Datapoint: prometheus.Datapoint{ Value: 60, }, }, expectedStartTs: prometheus.TimeSeries{ - Labels: append(prependLabel(commonLabels, prometheus.Label{ - Name: "__name__", - Value: "dhcp_period_start_ts", - })), + Labels: []prometheus.Label{ + { + Name: "__name__", + Value: "dhcp_period_start_ts", + }, + { + Name: "instance", + Value: "agent-test", + }, + { + Name: "agent_id", + Value: agentID.String(), + }, + { + Name: "agent", + Value: "agent-test", + }, + { + Name: "policy_id", + Value: policyID.String(), + }, + { + Name: "policy", + Value: "policy-test", + }, + { + Name: "handler", + Value: "policy_dhcp", + }, + }, Datapoint: prometheus.Datapoint{ Value: 1624888107, }, @@ -2586,19 +3313,71 @@ func TestPeriodConversion(t *testing.T) { } }`), expectedLength: prometheus.TimeSeries{ - Labels: append(prependLabel(commonLabels, prometheus.Label{ - Name: "__name__", - Value: "flow_period_length", - })), + Labels: []prometheus.Label{ + { + Name: "__name__", + Value: "flow_period_length", + }, + { + Name: "instance", + Value: "agent-test", + }, + { + Name: "agent_id", + Value: agentID.String(), + }, + { + Name: "agent", + Value: "agent-test", + }, + { + Name: "policy_id", + Value: policyID.String(), + }, + { + Name: "policy", + Value: "policy-test", + }, + { + Name: "handler", + Value: "policy_flow", + }, + }, Datapoint: prometheus.Datapoint{ Value: 60, }, }, expectedStartTs: prometheus.TimeSeries{ - Labels: append(prependLabel(commonLabels, prometheus.Label{ - Name: "__name__", - Value: "flow_period_start_ts", - })), + Labels: []prometheus.Label{ + { + Name: "__name__", + Value: "flow_period_start_ts", + }, + { + Name: "instance", + Value: "agent-test", + }, + { + Name: "agent_id", + Value: agentID.String(), + }, + { + Name: "agent", + Value: "agent-test", + }, + { + Name: "policy_id", + Value: policyID.String(), + }, + { + Name: "policy", + Value: "policy-test", + }, + { + Name: "handler", + Value: "policy_flow", + }, + }, Datapoint: prometheus.Datapoint{ Value: 1624888107, }, @@ -2682,6 +3461,10 @@ func TestFlowCardinalityConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, } cases := map[string]struct { @@ -2881,6 +3664,10 @@ func TestFlowConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, } cases := map[string]struct { @@ -3153,6 +3940,10 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, { Name: "device", Value: "192.168.4.7", @@ -3211,6 +4002,10 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, { Name: "device", Value: "192.168.4.7", @@ -3269,6 +4064,10 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, { Name: "device", Value: "192.168.4.7", @@ -3327,6 +4126,10 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, { Name: "device", Value: "192.168.4.7", @@ -3385,6 +4188,10 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, { Name: "device", Value: "192.168.4.7", @@ -3443,6 +4250,10 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, { Name: "device", Value: "192.168.4.7", @@ -3501,6 +4312,10 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, { Name: "device", Value: "192.168.4.7", @@ -3559,6 +4374,10 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, { Name: "device", Value: "192.168.4.7", @@ -3617,6 +4436,10 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, { Name: "device", Value: "192.168.4.7", @@ -3675,6 +4498,10 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, { Name: "device", Value: "192.168.4.7", @@ -3732,6 +4559,10 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, { Name: "device", Value: "192.168.4.7", @@ -3790,6 +4621,10 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, { Name: "device", Value: "192.168.4.7", @@ -3848,6 +4683,10 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, { Name: "device", Value: "192.168.4.7", @@ -3906,6 +4745,10 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, { Name: "device", Value: "192.168.4.7", @@ -3964,6 +4807,10 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, { Name: "device", Value: "192.168.4.7", @@ -4022,6 +4869,10 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_flow", + }, { Name: "device", Value: "192.168.4.7", @@ -4131,6 +4982,10 @@ func TestAgentTagsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_packets", + }, { Name: "testkey", Value: "testvalue", @@ -4245,6 +5100,10 @@ func TestTagsConversion(t *testing.T) { Name: "policy", Value: "policy-test", }, + { + Name: "handler", + Value: "policy_packets", + }, { Name: "asn", Value: "36236/NETACTUATE", @@ -4302,7 +5161,7 @@ func prependLabel(labelList []prometheus.Label, label prometheus.Label) []promet } func labelQuantiles(labelList []prometheus.Label, label prometheus.Label) []prometheus.Label { - for i := 0; i < 28; i += 7 { + for i := 0; i < 32; i += 8 { labelList = append(labelList[:i+1], labelList[i:]...) labelList[i] = label }