Skip to content

Commit

Permalink
optionnal connection tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
jpinsonneau committed Feb 21, 2023
1 parent 793772d commit 6939ddc
Show file tree
Hide file tree
Showing 25 changed files with 183 additions and 76 deletions.
2 changes: 2 additions & 0 deletions config/sample-frontend-config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
recordTypes:
- flowLog
portNaming:
enable: true
portNames:
Expand Down
3 changes: 2 additions & 1 deletion pkg/handler/frontend-config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ type QuickFilter struct {
}

type frontendConfig struct {
PortNaming struct {
RecordTypes []string `yaml:"recordTypes" json:"recordTypes"`
PortNaming struct {
Enable bool `yaml:"enable,omitempty" json:"enable"`
PortNames map[string]string `yaml:"portNames,omitempty" json:"portNames"`
} `yaml:"portNaming,omitempty" json:"portNaming"`
Expand Down
1 change: 1 addition & 0 deletions pkg/handler/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
var testLokiConfig = loki.Config{
URL: &url.URL{Scheme: "http", Host: "loki"},
Labels: map[string]struct{}{
"_RecordType": {},
"SrcK8S_Namespace": {},
"DstK8S_Namespace": {},
"SrcK8S_OwnerName": {},
Expand Down
9 changes: 5 additions & 4 deletions pkg/handler/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func getTopologyFlows(cfg *loki.Config, client httpclient.Caller, params url.Val
}
metricType := params.Get(metricTypeKey)
reporter := constants.Reporter(params.Get(reporterKey))
recordType := constants.RecordType(params.Get(recordTypeKey))
scope := params.Get(scopeKey)
groups := params.Get(groupsKey)
rawFilters := params.Get(filtersKey)
Expand All @@ -83,7 +84,7 @@ func getTopologyFlows(cfg *loki.Config, client httpclient.Caller, params url.Val
// match any, and multiple filters => run in parallel then aggregate
var queries []string
for _, group := range filterGroups {
query, code, err := buildTopologyQuery(cfg, group, start, end, limit, rateInterval, step, metricType, reporter, scope, groups)
query, code, err := buildTopologyQuery(cfg, group, start, end, limit, rateInterval, step, metricType, recordType, reporter, scope, groups)
if err != nil {
return nil, code, errors.New("Can't build query: " + err.Error())
}
Expand All @@ -99,7 +100,7 @@ func getTopologyFlows(cfg *loki.Config, client httpclient.Caller, params url.Val
if len(filterGroups) > 0 {
filters = filterGroups[0]
}
query, code, err := buildTopologyQuery(cfg, filters, start, end, limit, rateInterval, step, metricType, reporter, scope, groups)
query, code, err := buildTopologyQuery(cfg, filters, start, end, limit, rateInterval, step, metricType, recordType, reporter, scope, groups)
if err != nil {
return nil, code, err
}
Expand All @@ -116,8 +117,8 @@ func getTopologyFlows(cfg *loki.Config, client httpclient.Caller, params url.Val
return qr, http.StatusOK, nil
}

func buildTopologyQuery(cfg *loki.Config, queryFilters filters.SingleQuery, start, end, limit, rateInterval, step, metricType string, reporter constants.Reporter, scope, groups string) (string, int, error) {
qb, err := loki.NewTopologyQuery(cfg, start, end, limit, rateInterval, step, metricType, reporter, scope, groups)
func buildTopologyQuery(cfg *loki.Config, queryFilters filters.SingleQuery, start, end, limit, rateInterval, step, metricType string, recordType constants.RecordType, reporter constants.Reporter, scope, groups string) (string, int, error) {
qb, err := loki.NewTopologyQuery(cfg, start, end, limit, rateInterval, step, metricType, recordType, reporter, scope, groups)
if err != nil {
return "", http.StatusBadRequest, err
}
Expand Down
28 changes: 16 additions & 12 deletions pkg/loki/flow_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import (
)

const (
startParam = "start"
endParam = "end"
limitParam = "limit"
queryRangePath = "/loki/api/v1/query_range?query="
jsonOrJoiner = "+or+"
emptyMatch = `""`
recordTypeField = "_RecordType"
startParam = "start"
endParam = "end"
limitParam = "limit"
queryRangePath = "/loki/api/v1/query_range?query="
jsonOrJoiner = "+or+"
emptyMatch = `""`
)

// can contains only alphanumeric / '-' / '_' / '.' / ',' / '"' / '*' / ':' / '/' characteres
Expand All @@ -43,12 +44,15 @@ func NewFlowQueryBuilder(cfg *Config, start, end, limit string, reporter constan
stringLabelFilter(constants.AppLabel, constants.AppLabelValue),
}

if recordType == constants.RecordTypeAllConnections {
// connection _RecordType including newConnection, heartbeat or endConnection
labelFilters = append(labelFilters, regexLabelFilter(constants.RecordTypeLabel, strings.Join(constants.ConnectionTypes, "|")))
} else if len(recordType) > 0 {
// specific _RecordType either newConnection, heartbeat, endConnection or flowLog
labelFilters = append(labelFilters, stringLabelFilter(constants.RecordTypeLabel, string(recordType)))
// only filter on _RecordType if available
if cfg.IsLabel(recordTypeField) {
if recordType == constants.RecordTypeAllConnections {
// connection _RecordType including newConnection, heartbeat or endConnection
labelFilters = append(labelFilters, regexLabelFilter(constants.RecordTypeLabel, strings.Join(constants.ConnectionTypes, "|")))
} else if len(recordType) > 0 {
// specific _RecordType either newConnection, heartbeat, endConnection or flowLog
labelFilters = append(labelFilters, stringLabelFilter(constants.RecordTypeLabel, string(recordType)))
}
}

extraLineFilters := []string{}
Expand Down
23 changes: 18 additions & 5 deletions pkg/loki/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestFlowQuery_AddLabelFilters(t *testing.T) {
err = query.addFilter(filters.NewMatch("flis", `"flas"`))
require.NoError(t, err)
urlQuery := query.Build()
assert.Equal(t, `/loki/api/v1/query_range?query={app="netobserv-flowcollector",_RecordType="flowLog",foo="bar",flis="flas"}`, urlQuery)
assert.Equal(t, `/loki/api/v1/query_range?query={app="netobserv-flowcollector",foo="bar",flis="flas"}`, urlQuery)
}

func TestQuery_BackQuote_Error(t *testing.T) {
Expand All @@ -41,7 +41,7 @@ func TestFlowQuery_AddNotLabelFilters(t *testing.T) {
err = query.addFilter(filters.NewNotMatch("flis", `"flas"`))
require.NoError(t, err)
urlQuery := query.Build()
assert.Equal(t, `/loki/api/v1/query_range?query={app="netobserv-flowcollector",_RecordType="flowLog",foo="bar",flis!="flas"}`, urlQuery)
assert.Equal(t, `/loki/api/v1/query_range?query={app="netobserv-flowcollector",foo="bar",flis!="flas"}`, urlQuery)
}

func backtick(str string) string {
Expand All @@ -56,7 +56,7 @@ func TestFlowQuery_AddLineFilterMultipleValues(t *testing.T) {
err = query.addFilter(filters.NewMatch("foo", `bar,baz`))
require.NoError(t, err)
urlQuery := query.Build()
assert.Equal(t, `/loki/api/v1/query_range?query={app="netobserv-flowcollector",_RecordType="flowLog"}|~`+backtick(`foo":"(?i)[^"]*bar.*"|foo":"(?i)[^"]*baz.*"`), urlQuery)
assert.Equal(t, `/loki/api/v1/query_range?query={app="netobserv-flowcollector"}|~`+backtick(`foo":"(?i)[^"]*bar.*"|foo":"(?i)[^"]*baz.*"`), urlQuery)
}

func TestFlowQuery_AddNotLineFilters(t *testing.T) {
Expand All @@ -69,7 +69,7 @@ func TestFlowQuery_AddNotLineFilters(t *testing.T) {
err = query.addFilter(filters.NewNotMatch("flis", `"flas"`))
require.NoError(t, err)
urlQuery := query.Build()
assert.Equal(t, `/loki/api/v1/query_range?query={app="netobserv-flowcollector",_RecordType="flowLog"}|~`+backtick(`foo":"bar"`)+`!~`+backtick(`flis":"flas"`), urlQuery)
assert.Equal(t, `/loki/api/v1/query_range?query={app="netobserv-flowcollector"}|~`+backtick(`foo":"bar"`)+`!~`+backtick(`flis":"flas"`), urlQuery)
}

func TestFlowQuery_AddLineFiltersWithEmpty(t *testing.T) {
Expand All @@ -82,5 +82,18 @@ func TestFlowQuery_AddLineFiltersWithEmpty(t *testing.T) {
err = query.addFilter(filters.NewMatch("flis", `""`))
require.NoError(t, err)
urlQuery := query.Build()
assert.Equal(t, `/loki/api/v1/query_range?query={app="netobserv-flowcollector",_RecordType="flowLog"}|~`+backtick(`foo":"bar"`)+`|json|flis=""`, urlQuery)
assert.Equal(t, `/loki/api/v1/query_range?query={app="netobserv-flowcollector"}|~`+backtick(`foo":"bar"`)+`|json|flis=""`, urlQuery)
}

func TestFlowQuery_AddRecordTypeLabelFilter(t *testing.T) {
lokiURL, err := url.Parse("/")
require.NoError(t, err)
cfg := NewConfig(lokiURL, lokiURL, time.Second, "", "", false, false, "", false, []string{"foo", "flis", "_RecordType"})
query := NewFlowQueryBuilderWithDefaults(&cfg)
err = query.addFilter(filters.NewMatch("foo", `"bar"`))
require.NoError(t, err)
err = query.addFilter(filters.NewMatch("flis", `"flas"`))
require.NoError(t, err)
urlQuery := query.Build()
assert.Equal(t, `/loki/api/v1/query_range?query={app="netobserv-flowcollector",_RecordType="flowLog",foo="bar",flis="flas"}`, urlQuery)
}
22 changes: 19 additions & 3 deletions pkg/loki/topology_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ type Topology struct {
function string
dataField string
fields string
dedup bool
}

type TopologyQueryBuilder struct {
*FlowQueryBuilder
topology *Topology
}

func NewTopologyQuery(cfg *Config, start, end, limit, rateInterval, step, metricType string, reporter constants.Reporter, scope, groups string) (*TopologyQueryBuilder, error) {
func NewTopologyQuery(cfg *Config, start, end, limit, rateInterval, step, metricType string, recordType constants.RecordType, reporter constants.Reporter, scope, groups string) (*TopologyQueryBuilder, error) {
l := limit
if len(l) == 0 {
l = topologyDefaultLimit
Expand All @@ -43,15 +44,28 @@ func NewTopologyQuery(cfg *Config, start, end, limit, rateInterval, step, metric
t = "Bytes"
}

var d bool
var rt constants.RecordType
switch recordType {
case "allConnections", "newConnection", "heartbeat", "endConnection":
d = false
rt = "endConnection"
case "flowLog":
default:
d = true
rt = "flowLog"
}

return &TopologyQueryBuilder{
FlowQueryBuilder: NewFlowQueryBuilder(cfg, start, end, limit, reporter, ""),
FlowQueryBuilder: NewFlowQueryBuilder(cfg, start, end, limit, reporter, rt),
topology: &Topology{
rateInterval: rateInterval,
step: step,
limit: l,
function: f,
dataField: t,
fields: getFields(scope, groups),
dedup: d,
},
}, nil
}
Expand Down Expand Up @@ -117,7 +131,9 @@ func (q *TopologyQueryBuilder) Build() string {
sb.WriteString("(")
q.appendLabels(sb)
q.appendLineFilters(sb)
q.appendDeduplicateFilter(sb)
if q.topology.dedup {
q.appendDeduplicateFilter(sb)
}
q.appendJSON(sb, true)
if len(q.topology.dataField) > 0 {
sb.WriteString("|unwrap ")
Expand Down
20 changes: 11 additions & 9 deletions web/locales/en/plugin__netobserv-plugin.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@
"Both": "Both",
"Match all": "Match all",
"Match any": "Match any",
"Log type to query. A conversation is an aggregation of flows between same peers.": "Log type to query. A conversation is an aggregation of flows between same peers.",
"Log type to query. A conversation is an aggregation of flows between same peers. Only ended conversations will appear in Overview and Topology tabs.": "Log type to query. A conversation is an aggregation of flows between same peers. Only ended conversations will appear in Overview and Topology tabs.",
"Log type": "Log type",
"Only available in Flow Table view": "Only available in Flow Table view",
"Only available when FlowCollector.processor.outputRecordTypes option includes at least \"newConnection\", \"heartbeat\" or \"endConnection\"": "Only available when FlowCollector.processor.outputRecordTypes option includes at least \"newConnection\", \"heartbeat\" or \"endConnection\"",
"Every flow can be reported from the source node and/or the destination node. For in-cluster traffic, usually both source and destination nodes report flows, resulting in duplicated data. Cluster ingress traffic is only reported by destination nodes, and cluster egress by source nodes.": "Every flow can be reported from the source node and/or the destination node. For in-cluster traffic, usually both source and destination nodes report flows, resulting in duplicated data. Cluster ingress traffic is only reported by destination nodes, and cluster egress by source nodes.",
"Reporter node": "Reporter node",
"Only available in Flow Table view": "Only available in Flow Table view",
"Whether each query result has to match all the filters or just any of them": "Whether each query result has to match all the filters or just any of them",
"Match filters": "Match filters",
"Top items for internal backend queries.": "Top items for internal backend queries.",
Expand Down Expand Up @@ -148,6 +149,8 @@
"Export all datas": "Export all datas",
"Use this option to export every fields and labels from flows.": "Use this option to export every fields and labels from flows.",
"Else pick from available columns.": "Else pick from available columns.",
"flow": "flow",
"conversation": "conversation",
"Manage panels": "Manage panels",
"Selected panels will appear in the overview tab.": "Selected panels will appear in the overview tab.",
"Click and drag the items to reorder the panels in the overview tab.": "Click and drag the items to reorder the panels in the overview tab.",
Expand Down Expand Up @@ -175,8 +178,8 @@
"Remove {{name}} filter": "Remove {{name}} filter",
"Filter on {{name}}": "Filter on {{name}}",
"Switch {{name}} option": "Switch {{name}} option",
"Flow information": "Flow information",
"Conversation event information": "Conversation event information",
"Flow information": "Flow information",
"Details": "Details",
"More info": "More info",
"Raw": "Raw",
Expand Down Expand Up @@ -311,7 +314,6 @@
"Conversation start": "Conversation start",
"Conversation tick": "Conversation tick",
"Conversation end": "Conversation end",
"Conversations": "Conversations",
"Conversation Id": "Conversation Id",
"Last 5 minutes": "Last 5 minutes",
"Last 15 minutes": "Last 15 minutes",
Expand Down Expand Up @@ -370,21 +372,21 @@
"Specify a single conversation hash Id.": "Specify a single conversation hash Id.",
"Pps": "Pps",
"Network overview": "Network overview",
"Top {{limit}} flow rates stacked": "Top {{limit}} flow rates stacked",
"Top {{limit}} {{type}} rates stacked": "Top {{limit}} {{type}} rates stacked",
"bars": "bars",
"Top {{limit}} flow rates stacked with total": "Top {{limit}} flow rates stacked with total",
"Top {{limit}} {{type}} rates stacked with total": "Top {{limit}} {{type}} rates stacked with total",
"The top rates as bar compared to total as line over the selected interval": "The top rates as bar compared to total as line over the selected interval",
"Top {{limit}} flow rates": "Top {{limit}} flow rates",
"Top {{limit}} {{type}} rates": "Top {{limit}} {{type}} rates",
"lines": "lines",
"Top {{limit}} average rates": "Top {{limit}} average rates",
"donut": "donut",
"The average rate over the selected interval": "The average rate over the selected interval",
"Top {{limit}} latest rates": "Top {{limit}} latest rates",
"The last measured rate from the selected interval": "The last measured rate from the selected interval",
"Top {{limit}} flows distribution": "Top {{limit}} flows distribution",
"Top {{limit}} {{type}} distribution": "Top {{limit}} {{type}} distribution",
"sankey": "sankey",
"Total rate": "Total rate",
"line": "line",
"Packets dropped": "Packets dropped",
"Inbound flows by region": "Inbound flows by region"
"Inbound {{type}} by region": "Inbound {{type}} by region"
}
1 change: 1 addition & 0 deletions web/src/api/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ export const getConfig = (): Promise<Config> => {
return defaultConfig;
}
return <Config>{
recordTypes: r.data.recordTypes,
portNaming: {
enable: r.data.portNaming.enable ?? defaultConfig.portNaming.enable,
portNames: r.data.portNaming.portNames
Expand Down
1 change: 1 addition & 0 deletions web/src/components/__tests-data__/config.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export const ConfigResultSample = {
recordTypes: ['flowLog'],
portNaming: {
enable: true,
portNames: new Map([['3100', 'loki']])
Expand Down
16 changes: 14 additions & 2 deletions web/src/components/dropdowns/query-options-dropdown.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,12 @@ export const QueryOptionsPanel: React.FC<QueryOptionsDropdownProps> = ({
return (
<>
<div className="pf-c-select__menu-group">
<Tooltip content={t('Log type to query. A conversation is an aggregation of flows between same peers.')}>
<Tooltip
content={t(
// eslint-disable-next-line max-len
'Log type to query. A conversation is an aggregation of flows between same peers. Only ended conversations will appear in Overview and Topology tabs.'
)}
>
<div className="pf-c-select__menu-group-title">
<>
{t('Log type')} <InfoAltIcon />
Expand All @@ -98,7 +103,14 @@ export const QueryOptionsPanel: React.FC<QueryOptionsDropdownProps> = ({
<label className="pf-c-select__menu-item">
<Tooltip
trigger={disabled ? 'mouseenter focus' : ''}
content={disabled ? t('Only available in Flow Table view') : undefined}
content={
disabled
? t(
// eslint-disable-next-line max-len
'Only available when FlowCollector.processor.outputRecordTypes option includes at least "newConnection", "heartbeat" or "endConnection"'
)
: undefined
}
>
<Radio
isChecked={opt.value === recordType}
Expand Down
9 changes: 7 additions & 2 deletions web/src/components/filters/filters-dropdown.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@ import { buildGroups, getFilterFullName } from './filters-helper';
interface FiltersDropdownProps {
selectedFilter: FilterDefinition;
setSelectedFilter: (f: FilterDefinition) => void;
allowConnectionFilter?: boolean;
}

export const FiltersDropdown: React.FC<FiltersDropdownProps> = ({ selectedFilter, setSelectedFilter }) => {
export const FiltersDropdown: React.FC<FiltersDropdownProps> = ({
selectedFilter,
setSelectedFilter,
allowConnectionFilter
}) => {
const { t } = useTranslation('plugin__netobserv-plugin');
const groups = buildGroups(t);
const groups = buildGroups(t, allowConnectionFilter);

const [isSearchFiltersOpen, setSearchFiltersOpen] = React.useState<boolean>(false);
const [expandedGroup, setExpandedGroup] = React.useState(0);
Expand Down
4 changes: 2 additions & 2 deletions web/src/components/filters/filters-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ export type FilterGroup = {
filters: FilterDefinition[];
};

export const buildGroups = (t: TFunction): FilterGroup[] => {
const defs = getFilterDefinitions(t);
export const buildGroups = (t: TFunction, allowConnectionFilter?: boolean): FilterGroup[] => {
const defs = getFilterDefinitions(t, allowConnectionFilter);
return [
{
title: t('Common'),
Expand Down
8 changes: 7 additions & 1 deletion web/src/components/filters/filters-toolbar.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export interface FiltersToolbarProps {
menuContent?: JSX.Element[];
menuControl?: JSX.Element;
quickFilters: QuickFilter[];
allowConnectionFilter?: boolean;
}

export const FiltersToolbar: React.FC<FiltersToolbarProps> = ({
Expand All @@ -66,6 +67,7 @@ export const FiltersToolbar: React.FC<FiltersToolbarProps> = ({
clearFilters,
resetFilters,
quickFilters,
allowConnectionFilter,
...props
}) => {
const { push } = useHistory();
Expand Down Expand Up @@ -276,7 +278,11 @@ export const FiltersToolbar: React.FC<FiltersToolbarProps> = ({
>
<div>
<InputGroup>
<FiltersDropdown selectedFilter={selectedFilter} setSelectedFilter={setSelectedFilter} />
<FiltersDropdown
selectedFilter={selectedFilter}
setSelectedFilter={setSelectedFilter}
allowConnectionFilter={allowConnectionFilter}
/>
{getFilterControl()}
</InputGroup>
<FilterHints def={selectedFilter} />
Expand Down
Loading

0 comments on commit 6939ddc

Please sign in to comment.