Skip to content

Commit

Permalink
min / max / 90 / 99 percentiles
Browse files Browse the repository at this point in the history
  • Loading branch information
jpinsonneau committed Oct 18, 2023
1 parent d408c3c commit e3b2402
Show file tree
Hide file tree
Showing 50 changed files with 2,229 additions and 1,014 deletions.
16 changes: 11 additions & 5 deletions pkg/handler/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
)

const (
metricTypeKey = "type"
metricTypeKey = "type"
metricFunctionKey = "function"

aggregateByKey = "aggregateBy"
groupsKey = "groups"
rateIntervalKey = "rateInterval"
Expand Down Expand Up @@ -72,6 +74,10 @@ func getTopologyFlows(cfg *loki.Config, client httpclient.Caller, params url.Val
if err != nil {
return nil, http.StatusBadRequest, err
}
metricFunction, err := getMetricFunction(params)
if err != nil {
return nil, http.StatusBadRequest, err
}
recordType, err := getRecordType(params)
if err != nil {
return nil, http.StatusBadRequest, err
Expand All @@ -96,7 +102,7 @@ func getTopologyFlows(cfg *loki.Config, client httpclient.Caller, params url.Val
// match any, and multiple filters => run in parallel then aggregate
var queries []string
for _, group := range filterGroups {
query, code, err := buildTopologyQuery(cfg, group, start, end, limit, rateInterval, step, metricType, recordType, packetLoss, aggregate, groups)
query, code, err := buildTopologyQuery(cfg, group, start, end, limit, rateInterval, step, metricType, metricFunction, recordType, packetLoss, aggregate, groups)
if err != nil {
return nil, code, errors.New("Can't build query: " + err.Error())
}
Expand All @@ -112,7 +118,7 @@ func getTopologyFlows(cfg *loki.Config, client httpclient.Caller, params url.Val
if len(filterGroups) > 0 {
filters = filterGroups[0]
}
query, code, err := buildTopologyQuery(cfg, filters, start, end, limit, rateInterval, step, metricType, recordType, packetLoss, aggregate, groups)
query, code, err := buildTopologyQuery(cfg, filters, start, end, limit, rateInterval, step, metricType, metricFunction, recordType, packetLoss, aggregate, groups)
if err != nil {
return nil, code, err
}
Expand Down Expand Up @@ -148,8 +154,8 @@ func expandReportersMergeQueries(queries filters.MultiQueries) filters.MultiQuer
return out
}

func buildTopologyQuery(cfg *loki.Config, queryFilters filters.SingleQuery, start, end, limit, rateInterval, step string, metricType constants.MetricType, recordType constants.RecordType, packetLoss constants.PacketLoss, aggregate, groups string) (string, int, error) {
qb, err := loki.NewTopologyQuery(cfg, start, end, limit, rateInterval, step, metricType, recordType, packetLoss, aggregate, groups)
func buildTopologyQuery(cfg *loki.Config, queryFilters filters.SingleQuery, start, end, limit, rateInterval, step string, metricType constants.MetricType, metricFunction constants.MetricFunction, recordType constants.RecordType, packetLoss constants.PacketLoss, aggregate, groups string) (string, int, error) {
qb, err := loki.NewTopologyQuery(cfg, start, end, limit, rateInterval, step, metricType, metricFunction, recordType, packetLoss, aggregate, groups)
if err != nil {
return "", http.StatusBadRequest, err
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/handler/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,30 @@ func getMetricType(params url.Values) (constants.MetricType, error) {
return "", fmt.Errorf("invalid metric type: %s", mt)
}

func getMetricFunction(params url.Values) (constants.MetricFunction, error) {
mt, err := getMetricType(params)
if err != nil {
return "", err
}
// allow metric function only on latencies
if mt == constants.MetricTypeDNSLatencies || mt == constants.MetricTypeFlowRTT {
mf := params.Get(metricFunctionKey)
if mf == "" {
return constants.DefaultMetricFunction, nil
}
metricFunction := constants.MetricFunction(mf)
if metricFunction == constants.MetricFunctionAvg ||
metricFunction == constants.MetricFunctionMin ||
metricFunction == constants.MetricFunctionMax ||
metricFunction == constants.MetricFunctionP90 ||
metricFunction == constants.MetricFunctionP99 {
return metricFunction, nil
}
return "", fmt.Errorf("invalid metric function: %s", mf)
}
return "", nil
}

func getRateInterval(params url.Values) (string, error) {
rateInterval := params.Get(rateIntervalKey)
if rateInterval == "" {
Expand Down
75 changes: 57 additions & 18 deletions pkg/loki/topology_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Topology struct {
skipNonDNS bool
skipEmptyDNSRCode bool
skipEmptyRTT bool
scalar string
factor string
}

Expand All @@ -32,7 +33,7 @@ type TopologyQueryBuilder struct {
}

func NewTopologyQuery(cfg *Config, start, end, limit, rateInterval, step string, metricType constants.MetricType,
recordType constants.RecordType, packetLoss constants.PacketLoss,
metricFunction constants.MetricFunction, recordType constants.RecordType, packetLoss constants.PacketLoss,
aggregate, groups string) (*TopologyQueryBuilder, error) {
l := limit
if len(l) == 0 {
Expand All @@ -41,10 +42,8 @@ func NewTopologyQuery(cfg *Config, start, end, limit, rateInterval, step string,

fields := getFields(aggregate, groups)
var f, t string
factor := ""
factor, scalar := "", ""
switch metricType {
case constants.MetricTypeCount, constants.MetricTypeCountDNS:
f = "count_over_time"
case constants.MetricTypeDroppedPackets:
f = "rate"
t = "PktDropPackets"
Expand All @@ -54,14 +53,16 @@ func NewTopologyQuery(cfg *Config, start, end, limit, rateInterval, step string,
case constants.MetricTypeDroppedBytes:
f = "rate"
t = "PktDropBytes"
case constants.MetricTypeDNSLatencies:
f = "avg_over_time"
t = "DnsLatencyMs"
case constants.MetricTypeBytes:
f = "rate"
t = "Bytes"
case constants.MetricTypeCount, constants.MetricTypeCountDNS:
f, scalar = getLatencyFunctionWithScalar(metricFunction)
case constants.MetricTypeDNSLatencies:
f, scalar = getLatencyFunctionWithScalar(metricFunction)
t = "DnsLatencyMs"
case constants.MetricTypeFlowRTT:
f = "avg_over_time"
f, scalar = getLatencyFunctionWithScalar(metricFunction)
t = "TimeFlowRttNs"
factor = "/1000000" // nanoseconds to miliseconds
}
Expand Down Expand Up @@ -90,6 +91,7 @@ func NewTopologyQuery(cfg *Config, start, end, limit, rateInterval, step string,
skipNonDNS: metricType == constants.MetricTypeDNSLatencies || metricType == constants.MetricTypeCountDNS,
skipEmptyDNSRCode: aggregate == "dnsRCode",
skipEmptyRTT: metricType == constants.MetricTypeFlowRTT,
scalar: scalar,
factor: factor,
},
}, nil
Expand Down Expand Up @@ -139,10 +141,29 @@ func getFields(aggregate, groups string) string {
return strings.Join(fields[:], ",")
}

func getLatencyFunctionWithScalar(metricFunction constants.MetricFunction) (string, string) {
switch metricFunction {
case constants.MetricFunctionMax:
return "max_over_time", ""
case constants.MetricFunctionMin:
return "min_over_time", ""
case constants.MetricFunctionAvg:
return "avg_over_time", ""
case constants.MetricFunctionP90:
return "quantile_over_time", "0.9"
case constants.MetricFunctionP99:
return "quantile_over_time", "0.99"
default:
return "count_over_time", ""
}
}

func (q *TopologyQueryBuilder) Build() string {
sumBy := q.topology.function == "rate" || q.topology.function == "count_over_time"

// Build topology query like:
// /<url path>?query=
// topk(
// topk | bottomk(
// <k>,
// <sum | avg> by(<aggregations>) (
// <function>(
Expand All @@ -153,19 +174,28 @@ func (q *TopologyQueryBuilder) Build() string {
// )
// &<query params>&step=<step>
sb := q.createStringBuilderURL()
sb.WriteString("topk(")
if q.topology.function == "min_over_time" {
sb.WriteString("bottomk")
} else {
sb.WriteString("topk")
}
sb.WriteRune('(')
sb.WriteString(q.topology.limit)
sb.WriteRune(',')
if q.topology.function == "avg_over_time" {
sb.WriteString("avg")
} else {
sb.WriteString("sum")

if sumBy {
sb.WriteString("sum by(")
sb.WriteString(q.topology.fields)
sb.WriteRune(')')
}
sb.WriteString(" by(")
sb.WriteString(q.topology.fields)
sb.WriteString(") (")

sb.WriteRune('(')
sb.WriteString(q.topology.function)
sb.WriteString("(")
if len(q.topology.scalar) > 0 {
sb.WriteString(q.topology.scalar)
sb.WriteRune(',')
}
q.appendLabels(sb)
q.appendLineFilters(sb)

Expand Down Expand Up @@ -198,10 +228,19 @@ func (q *TopologyQueryBuilder) Build() string {
sb.WriteString(q.topology.rateInterval)
}
sb.WriteString("])")

if !sumBy {
sb.WriteString(" by(")
sb.WriteString(q.topology.fields)
sb.WriteRune(')')
}
sb.WriteRune(')')

if len(q.topology.factor) > 0 {
sb.WriteString(q.topology.factor)
}
sb.WriteString("))")
sb.WriteRune(')')

q.appendQueryParams(sb)
sb.WriteString("&step=")
sb.WriteString(q.topology.step)
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ func TestLokiConfigurationForTopology(t *testing.T) {
req2 := lokiMock.Calls[1].Arguments[1].(*http.Request)
queries := []string{req1.URL.Query().Get("query"), req2.URL.Query().Get("query")}
expected := []string{
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName) (rate({app="netobserv-flowcollector",FlowDirection=~"^0$|^2$"}|~` + "`" + `Duplicate":false` + "`" + `|json|unwrap Bytes|__error__=""[1m])))`,
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName) (rate({app="netobserv-flowcollector",FlowDirection="1",DstK8S_OwnerName=""}|~` + "`" + `Duplicate":false` + "`" + `|json|unwrap Bytes|__error__=""[1m])))`,
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName)(rate({app="netobserv-flowcollector",FlowDirection=~"^0$|^2$"}|~` + "`" + `Duplicate":false` + "`" + `|json|unwrap Bytes|__error__=""[1m])))`,
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName)(rate({app="netobserv-flowcollector",FlowDirection="1",DstK8S_OwnerName=""}|~` + "`" + `Duplicate":false` + "`" + `|json|unwrap Bytes|__error__=""[1m])))`,
}
// We don't predict the order so sort both actual and expected
sort.Strings(queries)
Expand Down Expand Up @@ -382,7 +382,7 @@ func TestLokiConfigurationForTableHistogram(t *testing.T) {
req1 := lokiMock.Calls[0].Arguments[1].(*http.Request)
query := req1.URL.Query().Get("query")
expected :=
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName) (count_over_time({app="netobserv-flowcollector"}|~` + "`" + `Duplicate":false` + "`" + `|json[30s])))`
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName)(count_over_time({app="netobserv-flowcollector"}|~` + "`" + `Duplicate":false` + "`" + `|json[30s])))`
assert.Equal(t, expected, query)

// without any multi-tenancy header
Expand Down
35 changes: 24 additions & 11 deletions pkg/utils/constants/constants.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package constants

type MetricType string
type MetricFunction string

type RecordType string
type PacketLoss string
type Direction string

const (
AppLabel = "app"
AppLabelValue = "netobserv-flowcollector"
RecordTypeLabel = "_RecordType"
AppLabel = "app"
AppLabelValue = "netobserv-flowcollector"
RecordTypeLabel = "_RecordType"

MetricTypeBytes MetricType = "bytes"
MetricTypePackets MetricType = "packets"
MetricTypeCount MetricType = "count"
Expand All @@ -18,20 +21,30 @@ const (
MetricTypeDroppedBytes MetricType = "droppedBytes"
MetricTypeDroppedPackets MetricType = "droppedPackets"
DefaultMetricType MetricType = MetricTypeBytes

MetricFunctionAvg MetricFunction = "avg"
MetricFunctionMin MetricFunction = "min"
MetricFunctionMax MetricFunction = "max"
MetricFunctionP90 MetricFunction = "p90"
MetricFunctionP99 MetricFunction = "p99"
DefaultMetricFunction MetricFunction = MetricFunctionAvg

RecordTypeAllConnections RecordType = "allConnections"
RecordTypeNewConnection RecordType = "newConnection"
RecordTypeHeartbeat RecordType = "heartbeat"
RecordTypeEndConnection RecordType = "endConnection"
RecordTypeLog RecordType = "flowLog"
DefaultRecordType RecordType = RecordTypeLog
PacketLossDropped PacketLoss = "dropped"
PacketLossHasDrop PacketLoss = "hasDrops"
PacketLossSent PacketLoss = "sent"
PacketLossAll PacketLoss = "all"
DefaultPacketLoss PacketLoss = PacketLossAll
Ingress Direction = "0"
Egress Direction = "1"
Inner Direction = "2"

PacketLossDropped PacketLoss = "dropped"
PacketLossHasDrop PacketLoss = "hasDrops"
PacketLossSent PacketLoss = "sent"
PacketLossAll PacketLoss = "all"
DefaultPacketLoss PacketLoss = PacketLossAll

Ingress Direction = "0"
Egress Direction = "1"
Inner Direction = "2"
)

var AnyConnectionType = []string{
Expand Down
Loading

0 comments on commit e3b2402

Please sign in to comment.