Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-1227 NETOBSERV-1388: Max / P90 / P99 graphs #412

Merged
merged 3 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions pkg/handler/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
)

const (
metricTypeKey = "type"
metricTypeKey = "type"
metricFunctionKey = "function"

aggregateByKey = "aggregateBy"
groupsKey = "groups"
rateIntervalKey = "rateInterval"
Expand Down Expand Up @@ -72,6 +74,10 @@ func getTopologyFlows(cfg *loki.Config, client httpclient.Caller, params url.Val
if err != nil {
return nil, http.StatusBadRequest, err
}
metricFunction, err := getMetricFunction(params)
if err != nil {
return nil, http.StatusBadRequest, err
}
recordType, err := getRecordType(params)
if err != nil {
return nil, http.StatusBadRequest, err
Expand All @@ -96,7 +102,7 @@ func getTopologyFlows(cfg *loki.Config, client httpclient.Caller, params url.Val
// match any, and multiple filters => run in parallel then aggregate
var queries []string
for _, group := range filterGroups {
query, code, err := buildTopologyQuery(cfg, group, start, end, limit, rateInterval, step, metricType, recordType, packetLoss, aggregate, groups)
query, code, err := buildTopologyQuery(cfg, group, start, end, limit, rateInterval, step, metricType, metricFunction, recordType, packetLoss, aggregate, groups)
if err != nil {
return nil, code, errors.New("Can't build query: " + err.Error())
}
Expand All @@ -112,7 +118,7 @@ func getTopologyFlows(cfg *loki.Config, client httpclient.Caller, params url.Val
if len(filterGroups) > 0 {
filters = filterGroups[0]
}
query, code, err := buildTopologyQuery(cfg, filters, start, end, limit, rateInterval, step, metricType, recordType, packetLoss, aggregate, groups)
query, code, err := buildTopologyQuery(cfg, filters, start, end, limit, rateInterval, step, metricType, metricFunction, recordType, packetLoss, aggregate, groups)
if err != nil {
return nil, code, err
}
Expand Down Expand Up @@ -148,8 +154,8 @@ func expandReportersMergeQueries(queries filters.MultiQueries) filters.MultiQuer
return out
}

func buildTopologyQuery(cfg *loki.Config, queryFilters filters.SingleQuery, start, end, limit, rateInterval, step string, metricType constants.MetricType, recordType constants.RecordType, packetLoss constants.PacketLoss, aggregate, groups string) (string, int, error) {
qb, err := loki.NewTopologyQuery(cfg, start, end, limit, rateInterval, step, metricType, recordType, packetLoss, aggregate, groups)
func buildTopologyQuery(cfg *loki.Config, queryFilters filters.SingleQuery, start, end, limit, rateInterval, step string, metricType constants.MetricType, metricFunction constants.MetricFunction, recordType constants.RecordType, packetLoss constants.PacketLoss, aggregate, groups string) (string, int, error) {
qb, err := loki.NewTopologyQuery(cfg, start, end, limit, rateInterval, step, metricType, metricFunction, recordType, packetLoss, aggregate, groups)
if err != nil {
return "", http.StatusBadRequest, err
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/handler/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,22 @@ func getMetricType(params url.Values) (constants.MetricType, error) {
return "", fmt.Errorf("invalid metric type: %s", mt)
}

func getMetricFunction(params url.Values) (constants.MetricFunction, error) {
mf := params.Get(metricFunctionKey)
if mf == "" {
return "", nil
}
metricFunction := constants.MetricFunction(mf)
if metricFunction == constants.MetricFunctionAvg ||
metricFunction == constants.MetricFunctionMin ||
metricFunction == constants.MetricFunctionMax ||
metricFunction == constants.MetricFunctionP90 ||
metricFunction == constants.MetricFunctionP99 {
return metricFunction, nil
}
return "", fmt.Errorf("invalid metric function: %s", mf)
}

func getRateInterval(params url.Values) (string, error) {
rateInterval := params.Get(rateIntervalKey)
if rateInterval == "" {
Expand Down
19 changes: 14 additions & 5 deletions pkg/loki/flow_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,19 +265,28 @@ func (q *FlowQueryBuilder) appendPktDropCauseFilter(sb *strings.Builder) {
}

func (q *FlowQueryBuilder) appendDNSFilter(sb *strings.Builder) {
// ensure at least one Dns field is specified
// |~`"Dns`
// ensure at least one Dns field is specified except DnsErrno
// |~`"DnsId`|~`"DnsLatencyMs`|~`"DnsFlagsResponseCode"`
sb.WriteString("|~`")
sb.WriteString(`"Dns`)
sb.WriteString(`"DnsId`)
sb.WriteString("`")
sb.WriteString("|~`")
sb.WriteString(`"DnsLatencyMs`)
sb.WriteString("`")
sb.WriteString("|~`")
sb.WriteString(`"DnsFlagsResponseCode"`)
sb.WriteString("`")
}

func (q *FlowQueryBuilder) appendDNSLatencyFilter(sb *strings.Builder) {
// ensure DnsLatencyMs field is specified
// |~`"DnsLatencyMs`
// ensure DnsLatencyMs field is specified and value is not zero
// |~`"DnsLatencyMs`!~`DnsLatencyMs%22:0[,}]`
sb.WriteString("|~`")
sb.WriteString(`"DnsLatencyMs`)
sb.WriteString("`")
sb.WriteString("!~`")
sb.WriteString(`"DnsLatencyMs":0[,}]`)
sb.WriteString("`")
}

func (q *FlowQueryBuilder) appendDNSRCodeFilter(sb *strings.Builder) {
Expand Down
140 changes: 94 additions & 46 deletions pkg/loki/topology_query.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package loki

import (
"fmt"
"strings"

"github.com/netobserv/network-observability-console-plugin/pkg/utils"
Expand All @@ -17,13 +18,14 @@ type Topology struct {
step string
function string
dataField string
fields string
labels string
skipEmptyDropState bool
skipEmptyDropCause bool
skipNonDNS bool
skipEmptyDNSLatency bool
skipEmptyDNSRCode bool
skipEmptyRTT bool
scalar string
factor string
}

Expand All @@ -33,71 +35,49 @@ type TopologyQueryBuilder struct {
}

func NewTopologyQuery(cfg *Config, start, end, limit, rateInterval, step string, metricType constants.MetricType,
recordType constants.RecordType, packetLoss constants.PacketLoss,
metricFunction constants.MetricFunction, recordType constants.RecordType, packetLoss constants.PacketLoss,
aggregate, groups string) (*TopologyQueryBuilder, error) {
l := limit
if len(l) == 0 {
l = topologyDefaultLimit
}

fields := getFields(aggregate, groups)
var f, t string
factor := ""
switch metricType {
case constants.MetricTypeCount, constants.MetricTypeCountDNS:
f = "count_over_time"
case constants.MetricTypeDroppedPackets:
f = "rate"
t = "PktDropPackets"
case constants.MetricTypePackets:
f = "rate"
t = "Packets"
case constants.MetricTypeDroppedBytes:
f = "rate"
t = "PktDropBytes"
case constants.MetricTypeDNSLatencies:
f = "avg_over_time"
t = "DnsLatencyMs"
case constants.MetricTypeBytes:
f = "rate"
t = "Bytes"
case constants.MetricTypeFlowRTT:
f = "avg_over_time"
t = "TimeFlowRttNs"
factor = "/1000000" // nanoseconds to miliseconds
}
labels := getLabels(aggregate, groups)
field, factor := getFieldsAndFactor(metricType)
f, scalar := getFunctionWithScalar(metricType, metricFunction)

var d bool
var dedup bool
var rt constants.RecordType
if utils.Contains(constants.AnyConnectionType, string(recordType)) {
d = false
dedup = false
rt = "endConnection"
} else {
d = true
dedup = true
rt = "flowLog"
}

return &TopologyQueryBuilder{
FlowQueryBuilder: NewFlowQueryBuilder(cfg, start, end, limit, d, rt, packetLoss),
FlowQueryBuilder: NewFlowQueryBuilder(cfg, start, end, limit, dedup, rt, packetLoss),
topology: &Topology{
rateInterval: rateInterval,
step: step,
limit: l,
function: f,
dataField: t,
fields: fields,
dataField: field,
factor: factor,
labels: labels,
skipEmptyDropState: aggregate == "droppedState",
skipEmptyDropCause: aggregate == "droppedCause",
skipNonDNS: metricType == constants.MetricTypeCountDNS,
skipEmptyDNSLatency: metricType == constants.MetricTypeDNSLatencies,
skipEmptyDNSRCode: aggregate == "dnsRCode",
skipEmptyRTT: metricType == constants.MetricTypeFlowRTT,
factor: factor,
scalar: scalar,
},
}, nil
}

func getFields(aggregate, groups string) string {
func getLabels(aggregate, groups string) string {
var fields []string
switch aggregate {
case "app":
Expand Down Expand Up @@ -141,10 +121,60 @@ func getFields(aggregate, groups string) string {
return strings.Join(fields[:], ",")
}

func getFieldsAndFactor(metricType constants.MetricType) (string, string) {
switch metricType {
case constants.MetricTypeDroppedPackets:
return "PktDropPackets", ""
case constants.MetricTypePackets:
return "Packets", ""
case constants.MetricTypeDroppedBytes:
return "PktDropBytes", ""
case constants.MetricTypeBytes:
return "Bytes", ""
case constants.MetricTypeDNSLatencies:
return "DnsLatencyMs", ""
case constants.MetricTypeFlowRTT:
return "TimeFlowRttNs", "/1000000" // nanoseconds to miliseconds
case constants.MetricTypeCount, constants.MetricTypeCountDNS:
return "", ""
default:
panic(fmt.Sprint("wrong metricType for fields and factor provided", metricType))
}
}

func getFunctionWithScalar(metricType constants.MetricType, metricFunction constants.MetricFunction) (string, string) {
switch metricFunction {
case constants.MetricFunctionMax:
return "max_over_time", ""
case constants.MetricFunctionMin:
return "min_over_time", ""
case constants.MetricFunctionAvg:
return "avg_over_time", ""
case constants.MetricFunctionP90:
return "quantile_over_time", "0.9"
case constants.MetricFunctionP99:
return "quantile_over_time", "0.99"
default:
switch metricType {
case constants.MetricTypeBytes,
constants.MetricTypePackets,
constants.MetricTypeDroppedBytes,
constants.MetricTypeDroppedPackets:
return "rate", ""
case constants.MetricTypeCount, constants.MetricTypeCountDNS, constants.MetricTypeFlowRTT, constants.MetricTypeDNSLatencies:
return "count_over_time", ""
default:
panic(fmt.Sprint("wrong metricType for function with scalar provided", metricType))
}
}
}

func (q *TopologyQueryBuilder) Build() string {
sumBy := q.topology.function == "rate" || q.topology.function == "count_over_time"

// Build topology query like:
// /<url path>?query=
// topk(
// topk | bottomk(
// <k>,
// <sum | avg> by(<aggregations>) (
// <function>(
Expand All @@ -155,19 +185,28 @@ func (q *TopologyQueryBuilder) Build() string {
// )
// &<query params>&step=<step>
sb := q.createStringBuilderURL()
sb.WriteString("topk(")
if q.topology.function == "min_over_time" {
sb.WriteString("bottomk")
} else {
sb.WriteString("topk")
}
sb.WriteRune('(')
sb.WriteString(q.topology.limit)
sb.WriteRune(',')
if q.topology.function == "avg_over_time" {
sb.WriteString("avg")
} else {
sb.WriteString("sum")

if sumBy {
sb.WriteString("sum by(")
sb.WriteString(q.topology.labels)
sb.WriteRune(')')
}
sb.WriteString(" by(")
sb.WriteString(q.topology.fields)
sb.WriteString(") (")

sb.WriteRune('(')
sb.WriteString(q.topology.function)
sb.WriteString("(")
if len(q.topology.scalar) > 0 {
sb.WriteString(q.topology.scalar)
sb.WriteRune(',')
}
q.appendLabels(sb)
q.appendLineFilters(sb)

Expand Down Expand Up @@ -202,10 +241,19 @@ func (q *TopologyQueryBuilder) Build() string {
sb.WriteString(q.topology.rateInterval)
}
sb.WriteString("])")

if !sumBy {
sb.WriteString(" by(")
sb.WriteString(q.topology.labels)
sb.WriteRune(')')
}
sb.WriteRune(')')

if len(q.topology.factor) > 0 {
sb.WriteString(q.topology.factor)
}
sb.WriteString("))")
sb.WriteRune(')')

q.appendQueryParams(sb)
sb.WriteString("&step=")
sb.WriteString(q.topology.step)
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ func TestLokiConfigurationForTopology(t *testing.T) {
req2 := lokiMock.Calls[1].Arguments[1].(*http.Request)
queries := []string{req1.URL.Query().Get("query"), req2.URL.Query().Get("query")}
expected := []string{
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName) (rate({app="netobserv-flowcollector",FlowDirection=~"^0$|^2$"}|~` + "`" + `Duplicate":false` + "`" + `|json|unwrap Bytes|__error__=""[1m])))`,
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName) (rate({app="netobserv-flowcollector",FlowDirection="1",DstK8S_OwnerName=""}|~` + "`" + `Duplicate":false` + "`" + `|json|unwrap Bytes|__error__=""[1m])))`,
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName)(rate({app="netobserv-flowcollector",FlowDirection=~"^0$|^2$"}|~` + "`" + `Duplicate":false` + "`" + `|json|unwrap Bytes|__error__=""[1m])))`,
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName)(rate({app="netobserv-flowcollector",FlowDirection="1",DstK8S_OwnerName=""}|~` + "`" + `Duplicate":false` + "`" + `|json|unwrap Bytes|__error__=""[1m])))`,
}
// We don't predict the order so sort both actual and expected
sort.Strings(queries)
Expand Down Expand Up @@ -382,7 +382,7 @@ func TestLokiConfigurationForTableHistogram(t *testing.T) {
req1 := lokiMock.Calls[0].Arguments[1].(*http.Request)
query := req1.URL.Query().Get("query")
expected :=
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName) (count_over_time({app="netobserv-flowcollector"}|~` + "`" + `Duplicate":false` + "`" + `|json[30s])))`
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName)(count_over_time({app="netobserv-flowcollector"}|~` + "`" + `Duplicate":false` + "`" + `|json[30s])))`
assert.Equal(t, expected, query)

// without any multi-tenancy header
Expand Down
Loading
Loading