diff --git a/sinker/backend/pktvisor/pktvisor.go b/sinker/backend/pktvisor/pktvisor.go index 8faaefc5a..7c70e65c5 100644 --- a/sinker/backend/pktvisor/pktvisor.go +++ b/sinker/backend/pktvisor/pktvisor.go @@ -206,6 +206,10 @@ func convertFlowToPromParticle(ctxt *context, statsMap map[string]interface{}, l ipv4_regex := `^(((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.|$)){4})` if ok, _ := regexp.MatchString(ipv4_regex+`|`+ipv6_regex, key); ok { + if ok = strings.Contains(label, "Devices"); !ok { + return + } + label = strings.ReplaceAll(label, "Devices", "") ctxt.deviceID = key convertFlowToPromParticle(ctxt, statistic, label, tsList) } else { @@ -389,6 +393,10 @@ func topNMetricsParser(label string) (string, error) { mapNMetrics["TopSRVFAIL"] = "qname" mapNMetrics["TopUDPPorts"] = "port" mapNMetrics["TopSlow"] = "qname" + mapNMetrics["TopGeoLocBytes"] = "geo_loc" + mapNMetrics["TopGeoLocPackes"] = "geo_loc" + mapNMetrics["TopAsnBytes"] = "asn" + mapNMetrics["TopAsnPackets"] = "asn" mapNMetrics["TopDstIpsBytes"] = "ip" mapNMetrics["TopDstIpsPackets"] = "ip" mapNMetrics["TopSrcIpsBytes"] = "ip" diff --git a/sinker/backend/pktvisor/pktvisor_test.go b/sinker/backend/pktvisor/pktvisor_test.go index 990b675de..3df2a69b8 100644 --- a/sinker/backend/pktvisor/pktvisor_test.go +++ b/sinker/backend/pktvisor/pktvisor_test.go @@ -1441,251 +1441,6 @@ func TestPacketsRatesConversion(t *testing.T) { } -func TestFlowRatesConversion(t *testing.T) { - var logger = zap.NewNop() - pktvisor.Register(logger) - - ownerID, err := uuid.NewV4() - require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) - - policyID, err := uuid.NewV4() - require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) - - agentID, err := uuid.NewV4() - require.Nil(t, err, fmt.Sprintf("unexpected error: %s", err)) - - var agent = &pb.AgentInfoRes{ - OwnerID: ownerID.String(), - AgentName: "agent-test", - } - - data := fleet.AgentMetricsRPCPayload{ - PolicyID: policyID.String(), - PolicyName: "policy-test", - Datasets: nil, - Format: "json", - BEVersion: "1.0", - } - - be := backend.GetBackend("pktvisor") - - commonLabels := []prometheus.Label{ - { - Name: "instance", - Value: "agent-test", - }, - { - Name: "agent_id", - Value: agentID.String(), - }, - { - Name: "agent", - Value: "agent-test", - }, - { - Name: "policy_id", - Value: policyID.String(), - }, - { - Name: "policy", - Value: "policy-test", - }, - { - Name: "handler", - Value: "policy_flow", - }, - { - Name: "quantile", - Value: "0.5", - }, - { - Name: "instance", - Value: "agent-test", - }, - { - Name: "agent_id", - Value: agentID.String(), - }, - { - Name: "agent", - Value: "agent-test", - }, - { - Name: "policy_id", - Value: policyID.String(), - }, - { - Name: "policy", - Value: "policy-test", - }, - { - Name: "handler", - Value: "policy_flow", - }, - { - Name: "quantile", - Value: "0.9", - }, - { - Name: "instance", - Value: "agent-test", - }, - { - Name: "agent_id", - Value: agentID.String(), - }, - { - Name: "agent", - Value: "agent-test", - }, - { - Name: "policy_id", - Value: policyID.String(), - }, - { - Name: "policy", - Value: "policy-test", - }, - { - Name: "handler", - Value: "policy_flow", - }, - { - Name: "quantile", - Value: "0.95", - }, - { - Name: "instance", - Value: "agent-test", - }, - { - Name: "agent_id", - Value: agentID.String(), - }, - { - Name: "agent", - Value: "agent-test", - }, - { - Name: "policy_id", - Value: policyID.String(), - }, - { - Name: "policy", - Value: "policy-test", - }, - { - Name: "handler", - Value: "policy_flow", - }, - { - Name: "quantile", - Value: "0.99", - }, - } - - cases := map[string]struct { - data []byte - expectedLabels []prometheus.Label - expectedDatapoints []float64 - }{ - "FlowPayloadRatesBytes": { - data: []byte(` - { - "policy_flow": { - "flow": { - "rates": { - "bytes": { - "p50": 2, - "p90": 3, - "p95": 76811346, - "p99": 76811347 - } - } - } - } - }`), - expectedLabels: labelQuantiles(commonLabels, prometheus.Label{ - Name: "__name__", - Value: "flow_rates_bytes", - }), - expectedDatapoints: []float64{2, 3, 76811346, 76811347}, - }, - "FlowPayloadRatesPackets": { - data: []byte(` - { - "policy_flow": { - "flow": { - "rates": { - "packets": { - "p50": 2, - "p90": 3, - "p95": 76811346, - "p99": 76811347 - } - } - } - } - }`), - expectedLabels: labelQuantiles(commonLabels, prometheus.Label{ - Name: "__name__", - Value: "flow_rates_packets", - }), - expectedDatapoints: []float64{2, 3, 76811346, 76811347}, - }, - "FlowPayloadSize": { - data: []byte(` - { - "policy_flow": { - "flow": { - "devices": { - "192.168.0.1": { - "payload_size": { - "p50": 2, - "p90": 3, - "p95": 76811346, - "p99": 76811347 - } - } - } - } - } - }`), - expectedLabels: labelQuantiles(labelQuantiles(commonLabels, prometheus.Label{ - Name: "device", - Value: "192.168.0.1", - }), prometheus.Label{ - Name: "__name__", - Value: "flow_devices_payload_size", - }), - expectedDatapoints: []float64{2, 3, 76811346, 76811347}, - }, - } - - for desc, c := range cases { - t.Run(desc, func(t *testing.T) { - data.Data = c.data - res, err := be.ProcessMetrics(agent, agentID.String(), data) - require.Nil(t, err, fmt.Sprintf("%s: unexpected error: %s", desc, err)) - var receivedLabel []prometheus.Label - var receivedDatapoint []float64 - - for _, value := range res { - if c.expectedLabels[0] == value.Labels[0] { - for _, labels := range value.Labels { - receivedLabel = append(receivedLabel, labels) - } - receivedDatapoint = append(receivedDatapoint, value.Datapoint.Value) - } - } - - assert.ElementsMatch(t, c.expectedLabels, receivedLabel, fmt.Sprintf("%s: expected %v got %v", desc, c.expectedLabels, receivedLabel)) - assert.ElementsMatch(t, c.expectedDatapoints, receivedDatapoint, fmt.Sprintf("%s: expected %v got %v", desc, c.expectedDatapoints, receivedDatapoint)) - }) - } - -} - func TestDNSTopKMetricsConversion(t *testing.T) { var logger = zap.NewNop() pktvisor.Register(logger) @@ -3492,7 +3247,7 @@ func TestFlowCardinalityConversion(t *testing.T) { Value: "192.168.4.7", }), prometheus.Label{ Name: "__name__", - Value: "flow_devices_cardinality_dst_ips_out", + Value: "flow_cardinality_dst_ips_out", })), Datapoint: prometheus.Datapoint{ Value: 4, @@ -3522,7 +3277,7 @@ func TestFlowCardinalityConversion(t *testing.T) { Value: "192.168.4.7", }), prometheus.Label{ Name: "__name__", - Value: "flow_devices_cardinality_dst_ports_out", + Value: "flow_cardinality_dst_ports_out", })), Datapoint: prometheus.Datapoint{ Value: 31, @@ -3551,7 +3306,7 @@ func TestFlowCardinalityConversion(t *testing.T) { Value: "192.168.4.7", }), prometheus.Label{ Name: "__name__", - Value: "flow_devices_cardinality_src_ips_in", + Value: "flow_cardinality_src_ips_in", })), Datapoint: prometheus.Datapoint{ Value: 4, @@ -3579,7 +3334,7 @@ func TestFlowCardinalityConversion(t *testing.T) { Value: "192.168.4.7", }), prometheus.Label{ Name: "__name__", - Value: "flow_devices_cardinality_src_ports_in", + Value: "flow_cardinality_src_ports_in", })), Datapoint: prometheus.Datapoint{ Value: 31, @@ -3674,38 +3429,38 @@ func TestFlowConversion(t *testing.T) { data []byte expected prometheus.TimeSeries }{ - "FlowPayloadFiltered": { + "FlowPayloadRecordsFiltered": { data: []byte(` { "policy_flow": { "flow": { - "filtered": 8 + "records_filtered": 8 } } }`), expected: prometheus.TimeSeries{ Labels: append(prependLabel(commonLabels, prometheus.Label{ Name: "__name__", - Value: "flow_filtered", + Value: "flow_records_filtered", })), Datapoint: prometheus.Datapoint{ Value: 8, }, }, }, - "FlowPayloadTotal": { + "FlowPayloadRecordsTotal": { data: []byte(` { "policy_flow": { "flow": { - "total": 8 + "records_total": 8 } } }`), expected: prometheus.TimeSeries{ Labels: append(prependLabel(commonLabels, prometheus.Label{ Name: "__name__", - Value: "flow_total", + Value: "flow_records_total", })), Datapoint: prometheus.Datapoint{ Value: 8, @@ -3731,7 +3486,7 @@ func TestFlowConversion(t *testing.T) { Value: "192.168.4.7", }), prometheus.Label{ Name: "__name__", - Value: "flow_devices_ipv4", + Value: "flow_ipv4", })), Datapoint: prometheus.Datapoint{ Value: 52785, @@ -3757,7 +3512,7 @@ func TestFlowConversion(t *testing.T) { Value: "192.168.4.7", }), prometheus.Label{ Name: "__name__", - Value: "flow_devices_ipv6", + Value: "flow_ipv6", })), Datapoint: prometheus.Datapoint{ Value: 52785, @@ -3783,7 +3538,7 @@ func TestFlowConversion(t *testing.T) { Value: "192.168.4.7", }), prometheus.Label{ Name: "__name__", - Value: "flow_devices_other_l4", + Value: "flow_other_l4", })), Datapoint: prometheus.Datapoint{ Value: 52785, @@ -3809,7 +3564,7 @@ func TestFlowConversion(t *testing.T) { Value: "192.168.4.7", }), prometheus.Label{ Name: "__name__", - Value: "flow_devices_tcp", + Value: "flow_tcp", })), Datapoint: prometheus.Datapoint{ Value: 52785, @@ -3835,7 +3590,7 @@ func TestFlowConversion(t *testing.T) { Value: "192.168.4.7", }), prometheus.Label{ Name: "__name__", - Value: "flow_devices_udp", + Value: "flow_udp", })), Datapoint: prometheus.Datapoint{ Value: 52785, @@ -3918,7 +3673,7 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Labels: []prometheus.Label{ { Name: "__name__", - Value: "flow_devices_top_dst_ips_and_port_bytes", + Value: "flow_top_dst_ips_and_port_bytes", }, { Name: "instance", @@ -3980,7 +3735,7 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Labels: []prometheus.Label{ { Name: "__name__", - Value: "flow_devices_top_dst_ips_and_port_packets", + Value: "flow_top_dst_ips_and_port_packets", }, { Name: "instance", @@ -4042,7 +3797,7 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Labels: []prometheus.Label{ { Name: "__name__", - Value: "flow_devices_top_dst_ips_bytes", + Value: "flow_top_dst_ips_bytes", }, { Name: "instance", @@ -4104,7 +3859,7 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Labels: []prometheus.Label{ { Name: "__name__", - Value: "flow_devices_top_dst_ips_packets", + Value: "flow_top_dst_ips_packets", }, { Name: "instance", @@ -4166,7 +3921,7 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Labels: []prometheus.Label{ { Name: "__name__", - Value: "flow_devices_top_dst_ports_bytes", + Value: "flow_top_dst_ports_bytes", }, { Name: "instance", @@ -4228,7 +3983,7 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Labels: []prometheus.Label{ { Name: "__name__", - Value: "flow_devices_top_dst_ports_packets", + Value: "flow_top_dst_ports_packets", }, { Name: "instance", @@ -4290,7 +4045,7 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Labels: []prometheus.Label{ { Name: "__name__", - Value: "flow_devices_top_in_if_index_bytes", + Value: "flow_top_in_if_index_bytes", }, { Name: "instance", @@ -4352,7 +4107,7 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Labels: []prometheus.Label{ { Name: "__name__", - Value: "flow_devices_top_in_if_index_packets", + Value: "flow_top_in_if_index_packets", }, { Name: "instance", @@ -4414,7 +4169,7 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Labels: []prometheus.Label{ { Name: "__name__", - Value: "flow_devices_top_out_if_index_bytes", + Value: "flow_top_out_if_index_bytes", }, { Name: "instance", @@ -4476,7 +4231,7 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Labels: []prometheus.Label{ { Name: "__name__", - Value: "flow_devices_top_out_if_index_packets", + Value: "flow_top_out_if_index_packets", }, { Name: "instance", @@ -4537,7 +4292,7 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Labels: []prometheus.Label{ { Name: "__name__", - Value: "flow_devices_top_src_ips_and_port_bytes", + Value: "flow_top_src_ips_and_port_bytes", }, { Name: "instance", @@ -4599,7 +4354,7 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Labels: []prometheus.Label{ { Name: "__name__", - Value: "flow_devices_top_src_ips_and_port_packets", + Value: "flow_top_src_ips_and_port_packets", }, { Name: "instance", @@ -4661,7 +4416,7 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Labels: []prometheus.Label{ { Name: "__name__", - Value: "flow_devices_top_src_ips_bytes", + Value: "flow_top_src_ips_bytes", }, { Name: "instance", @@ -4723,7 +4478,7 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Labels: []prometheus.Label{ { Name: "__name__", - Value: "flow_devices_top_src_ips_packets", + Value: "flow_top_src_ips_packets", }, { Name: "instance", @@ -4785,7 +4540,7 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Labels: []prometheus.Label{ { Name: "__name__", - Value: "flow_devices_top_src_ports_bytes", + Value: "flow_top_src_ports_bytes", }, { Name: "instance", @@ -4847,7 +4602,7 @@ func TestFlowTopKMetricsConversion(t *testing.T) { Labels: []prometheus.Label{ { Name: "__name__", - Value: "flow_devices_top_src_ports_packets", + Value: "flow_top_src_ports_packets", }, { Name: "instance", diff --git a/sinker/backend/pktvisor/types.go b/sinker/backend/pktvisor/types.go index baa61fa93..32d943774 100644 --- a/sinker/backend/pktvisor/types.go +++ b/sinker/backend/pktvisor/types.go @@ -177,16 +177,16 @@ type FlowPayload struct { SrcIpsIn int64 `mapstructure:"src_ips_in"` SrcPortsIn int64 `mapstructure:"src_ports_in"` } `mapstructure:"cardinality"` - Filtered int64 `mapstructure:"filtered"` - Flows int64 `mapstructure:"flows"` + RecordsFiltered int64 `mapstructure:"records_filtered"` Ipv4 int64 `mapstructure:"ipv4"` Ipv6 int64 `mapstructure:"ipv6"` OtherL4 int64 `mapstructure:"other_l4"` - PayloadSize Quantiles `mapstructure:"payload_size"` TCP int64 `mapstructure:"tcp"` UDP int64 `mapstructure:"udp"` - TopGeoLoc []NameCount `mapstructure:"top_geoLoc"` - TopASN []NameCount `mapstructure:"top_asn"` + TopGeoLocBytes []NameCount `mapstructure:"top_geoLoc_bytes"` + TopGeoLocPackets []NameCount `mapstructure:"top_geoLoc_packets"` + TopAsnBytes []NameCount `mapstructure:"top_ASN_bytes"` + TopAsnPackets []NameCount `mapstructure:"top_ASN_packets"` TopDstIpsAndPortBytes []NameCount `mapstructure:"top_dst_ips_and_port_bytes"` TopDstIpsAndPortPackets []NameCount `mapstructure:"top_dst_ips_and_port_packets"` TopDstIpsBytes []NameCount `mapstructure:"top_dst_ips_bytes"` @@ -205,22 +205,12 @@ type FlowPayload struct { TopSrcIpsPackets []NameCount `mapstructure:"top_src_ips_packets"` TopSrcPortsBytes []NameCount `mapstructure:"top_src_ports_bytes"` TopSrcPortsPackets []NameCount `mapstructure:"top_src_ports_packets"` - Total int64 `mapstructure:"total"` + RecordsTotal int64 `mapstructure:"records_total"` Udp int64 `mapstructure:"udp"` } `mapstructure:"devices"` - DeepSamples int64 `mapstructure:"deep_samples"` - Events int64 `mapstructure:"events"` - Filtered int64 `mapstructure:"filtered"` - Total int64 `mapstructure:"total"` - Period PeriodPayload `mapstructure:"period"` - Rates struct { - Bytes Rates `mapstructure:"bytes"` - Packets Rates `mapstructure:"packets"` - Events Rates `mapstructure:"events"` - } `mapstructure:"rates"` - Volume struct { - Bytes Quantiles `mapstructure:"bytes"` - } `mapstructure:"volume"` + RecordsFiltered int64 `mapstructure:"records_filtered"` + RecordsTotal int64 `mapstructure:"records_total"` + Period PeriodPayload `mapstructure:"period"` } // StatSnapshot is a snapshot of a given period from pktvisord