From 48df8a794495bb8e0a95489dc671609b968b84eb Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Thu, 23 Jun 2022 20:59:25 +0800 Subject: [PATCH 1/6] add url clustering methods Signed-off-by: Daxin Wang --- collector/pkg/urlclustering/alphabet.go | 105 +++++++++++++++++++ collector/pkg/urlclustering/alphabet_test.go | 83 +++++++++++++++ collector/pkg/urlclustering/noparam.go | 26 +++++ collector/pkg/urlclustering/urlcluster.go | 34 ++++++ 4 files changed, 248 insertions(+) create mode 100644 collector/pkg/urlclustering/alphabet.go create mode 100644 collector/pkg/urlclustering/alphabet_test.go create mode 100644 collector/pkg/urlclustering/noparam.go create mode 100644 collector/pkg/urlclustering/urlcluster.go diff --git a/collector/pkg/urlclustering/alphabet.go b/collector/pkg/urlclustering/alphabet.go new file mode 100644 index 000000000..d1922513d --- /dev/null +++ b/collector/pkg/urlclustering/alphabet.go @@ -0,0 +1,105 @@ +package urlclustering + +import ( + "regexp" + "strings" +) + +// AlphabeticClusteringMethod clustering all the non-alphabetic characters to '*' +// and will trim the parameters at the end of the string. +type AlphabeticClusteringMethod struct { + regexp *regexp.Regexp +} + +func NewAlphabeticalClusteringMethod() *AlphabeticClusteringMethod { + exp, _ := regexp.Compile("^[A-Za-z_-]+$") + return &AlphabeticClusteringMethod{ + regexp: exp, + } +} + +func (m *AlphabeticClusteringMethod) ClusteringBaseline(endpoint string) string { + if endpoint == "" { + return "" + } + index := strings.Index(endpoint, "?") + if index != -1 { + endpoint = endpoint[:index] + } + + // Split the endpoint into multiple segments + endpoint = strings.TrimSpace(endpoint) + segments := strings.Split(endpoint, "/") + // Iterate over all parts and execute the regular expression. + resultSegments := make([]string, 0, len(segments)) + // Skip the first segment because it is supposed to be always empty. + for i := 1; i < len(segments); i++ { + if segments[i] == "" || m.regexp.MatchString(segments[i]) { + resultSegments = append(resultSegments, segments[i]) + } else { + // If the segment is composed of non-alphabetic characters, we replace it with a star. + resultSegments = append(resultSegments, "*") + } + } + // Re-combine all parts together + var resultEndpoint string + for _, seg := range resultSegments { + resultEndpoint = resultEndpoint + "/" + seg + } + + return resultEndpoint +} + +func (m *AlphabeticClusteringMethod) Clustering(endpoint string) string { + if endpoint == "" { + return "" + } + endpointBytes := []byte(endpoint) + + resultBytes := make([]byte, 0) + + currentSegmentIsStar := false + currentSegment := make([]byte, 0) + + for _, b := range endpointBytes { + if b == ' ' { + continue + } + // End of the current segment. + if b == '/' || b == '?' { + if currentSegmentIsStar { + resultBytes = append(resultBytes, '*') + } else { + // currentSegment could be empty + resultBytes = append(resultBytes, currentSegment...) + } + currentSegment = make([]byte, 0) + currentSegmentIsStar = false + if b == '/' { + resultBytes = append(resultBytes, '/') + continue + } else if b == '?' { + break + } + } + if currentSegmentIsStar { + continue + } + if isAlphabetical(b) { + currentSegment = append(currentSegment, b) + } else { + currentSegmentIsStar = true + } + } + // Deal with the last segment. + if currentSegmentIsStar { + resultBytes = append(resultBytes, '*') + } else if len(currentSegment) != 0 { + resultBytes = append(resultBytes, currentSegment...) + } + return string(resultBytes) +} + +func isAlphabetical(b byte) bool { + return b == '-' || b == '_' || (b >= 'A' && b <= 'Z') || (b >= 'a' && b <= 'z') +} diff --git a/collector/pkg/urlclustering/alphabet_test.go b/collector/pkg/urlclustering/alphabet_test.go new file mode 100644 index 000000000..764650ebe --- /dev/null +++ b/collector/pkg/urlclustering/alphabet_test.go @@ -0,0 +1,83 @@ +package urlclustering + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +type testCase struct { + endpoint string + want string +} + +func newTestcases() []testCase { + return []testCase{ + {"/", "/"}, + {"/A/b/20000/", "/A/b/*/"}, + {" /test/22?v=a", "/test/*"}, + {"/a12?a=1132", "/*"}, + {"/abcd/1234a/efg/b&*", "/abcd/*/efg/*"}, + // Double slashes is valid but not recommended + {"/a//b/c?d=2&e=3", "/a//b/c"}, + } +} + +func TestAlphabeticClusteringMethod_Clustering(t *testing.T) { + method := NewAlphabeticalClusteringMethod() + testCases := newTestcases() + for _, c := range testCases { + assert.Equal(t, c.want, method.Clustering(c.endpoint)) + } +} + +func TestAlphabeticClusteringMethod_ClusteringBaseline(t *testing.T) { + method := NewAlphabeticalClusteringMethod() + testCases := newTestcases() + for _, c := range testCases { + assert.Equal(t, c.want, method.ClusteringBaseline(c.endpoint)) + } +} + +func Test_isAlphabetical(t *testing.T) { + type args struct { + b byte + } + tests := []struct { + name string + args args + want bool + }{ + {"alphabetical", args{'a'}, true}, + {"non-alphabetical", args{'%'}, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isAlphabetical(tt.args.b); got != tt.want { + t.Errorf("isAlphabetical() = %v, want %v", got, tt.want) + } + }) + } +} + +func Benchmark_ClusteringBaseline(b *testing.B) { + method := NewAlphabeticalClusteringMethod() + testCases := newTestcases() + b.ResetTimer() + for i := 0; i < b.N; i++ { + for _, c := range testCases { + method.ClusteringBaseline(c.endpoint) + } + } +} + +func Benchmark_Clustering(b *testing.B) { + method := NewAlphabeticalClusteringMethod() + testCases := newTestcases() + b.ResetTimer() + for i := 0; i < b.N; i++ { + for _, c := range testCases { + method.Clustering(c.endpoint) + } + } +} diff --git a/collector/pkg/urlclustering/noparam.go b/collector/pkg/urlclustering/noparam.go new file mode 100644 index 000000000..82610d571 --- /dev/null +++ b/collector/pkg/urlclustering/noparam.go @@ -0,0 +1,26 @@ +package urlclustering + +import ( + "strings" +) + +// NoParamClusteringMethod removes the parameters that are end of the URL string. +type NoParamClusteringMethod struct { +} + +func NewNoParamClusteringMethod() ClusteringMethod { + return &NoParamClusteringMethod{} +} + +func (m *NoParamClusteringMethod) Clustering(endpoint string) string { + if endpoint == "" { + return "" + } + // Remove the parameters first + index := strings.Index(endpoint, "?") + if index != -1 { + endpoint = endpoint[:index] + } + + return endpoint +} diff --git a/collector/pkg/urlclustering/urlcluster.go b/collector/pkg/urlclustering/urlcluster.go new file mode 100644 index 000000000..21d90dbe1 --- /dev/null +++ b/collector/pkg/urlclustering/urlcluster.go @@ -0,0 +1,34 @@ +package urlclustering + +type ClusteringMethod interface { + // Clustering receives a no-host endpoint string of the HTTP request and + // return its clustering result. + // Some examples of the endpoint: + // - /path/to/file/1 + // - /part1/part2/2?param=1 + // - /CloudDetective-Harmonycloud/kindling + Clustering(endpoint string) string +} + +var ( + alphabeticMethod ClusteringMethod + noParamMethod ClusteringMethod +) + +// AlphabeticClustering is a convenient method that calls AlphabeticClusteringMethod.Clustering(). +// This method is not thread-safe. +func AlphabeticClustering(endpoint string) string { + if alphabeticMethod == nil { + alphabeticMethod = NewAlphabeticalClusteringMethod() + } + return alphabeticMethod.Clustering(endpoint) +} + +// NoParamClustering is a global method that calls NoParamClusteringMethod.Clustering(). +// This method is not thread-safe. +func NoParamClustering(endpoint string) string { + if noParamMethod == nil { + noParamMethod = NewNoParamClusteringMethod() + } + return noParamMethod.Clustering(endpoint) +} From 156808572d704688dc8f93e590cc704d8fde5b2b Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Thu, 23 Jun 2022 21:01:38 +0800 Subject: [PATCH 2/6] networkanalyzer uses url-clustering method Signed-off-by: Daxin Wang --- collector/analyzer/network/protocol/http/http_request.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/collector/analyzer/network/protocol/http/http_request.go b/collector/analyzer/network/protocol/http/http_request.go index 0078d9f89..e06bff33b 100644 --- a/collector/analyzer/network/protocol/http/http_request.go +++ b/collector/analyzer/network/protocol/http/http_request.go @@ -4,6 +4,7 @@ import ( "strings" "github.com/Kindling-project/kindling/collector/analyzer/tools" + "github.com/Kindling-project/kindling/collector/pkg/urlclustering" "github.com/Kindling-project/kindling/collector/analyzer/network/protocol" "github.com/Kindling-project/kindling/collector/model/constlabels" @@ -55,7 +56,7 @@ func parseHttpRequest() protocol.ParsePkgFn { message.AddByteArrayUtf8Attribute(constlabels.HttpUrl, url) message.AddByteArrayUtf8Attribute(constlabels.HttpRequestPayload, message.GetData(0, protocol.GetHttpPayLoadLength())) - contentKey := getContentKey(string(url)) + contentKey := urlclustering.AlphabeticClustering(string(url)) if len(contentKey) == 0 { contentKey = "*" } From 7f0a71cf1a7064343215dbc5feed50d28911527c Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Fri, 24 Jun 2022 10:52:24 +0800 Subject: [PATCH 3/6] add more testcases Signed-off-by: Daxin Wang --- collector/pkg/urlclustering/noparam.go | 1 + collector/pkg/urlclustering/noparam_test.go | 27 +++++++++++++++++++++ 2 files changed, 28 insertions(+) create mode 100644 collector/pkg/urlclustering/noparam_test.go diff --git a/collector/pkg/urlclustering/noparam.go b/collector/pkg/urlclustering/noparam.go index 82610d571..bfbd4d758 100644 --- a/collector/pkg/urlclustering/noparam.go +++ b/collector/pkg/urlclustering/noparam.go @@ -16,6 +16,7 @@ func (m *NoParamClusteringMethod) Clustering(endpoint string) string { if endpoint == "" { return "" } + endpoint = strings.TrimSpace(endpoint) // Remove the parameters first index := strings.Index(endpoint, "?") if index != -1 { diff --git a/collector/pkg/urlclustering/noparam_test.go b/collector/pkg/urlclustering/noparam_test.go new file mode 100644 index 000000000..99e0e1ac1 --- /dev/null +++ b/collector/pkg/urlclustering/noparam_test.go @@ -0,0 +1,27 @@ +package urlclustering + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func noParamTestcases() []testCase { + return []testCase{ + {"/", "/"}, + {"/A/b/20000/", "/A/b/20000/"}, + {" /test/22?v=a", "/test/22"}, + {"/a12?a=1132", "/a12"}, + {"/abcd/1234a/efg/b&*", "/abcd/1234a/efg/b&*"}, + // Double slashes is valid but not recommended + {"/a//b/c?d=2&e=3", "/a//b/c"}, + } +} + +func TestNoParamClusteringMethod_Clustering(t *testing.T) { + method := NewNoParamClusteringMethod() + testCases := noParamTestcases() + for _, c := range testCases { + assert.Equal(t, c.want, method.Clustering(c.endpoint)) + } +} From 5c32196f6ec77ed4ebe20eb34334d8ff921dea69 Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Fri, 24 Jun 2022 15:53:11 +0800 Subject: [PATCH 4/6] add configuration of url clustering Signed-off-by: Daxin Wang --- collector/analyzer/network/config.go | 54 ++++++++++- .../analyzer/network/network_analyzer.go | 12 ++- .../network/protocol/factory/config.go | 19 ++++ .../network/protocol/factory/factory.go | 96 +++++++++---------- .../network/protocol/http/http_parser.go | 14 ++- .../network/protocol/http/http_parser_test.go | 4 +- .../network/protocol/http/http_request.go | 4 +- .../docker/kindling-collector-config.yml | 6 ++ deploy/agent/kindling-collector-config.yml | 6 ++ 9 files changed, 154 insertions(+), 61 deletions(-) create mode 100644 collector/analyzer/network/protocol/factory/config.go diff --git a/collector/analyzer/network/config.go b/collector/analyzer/network/config.go index d64275b57..8caf3f844 100644 --- a/collector/analyzer/network/config.go +++ b/collector/analyzer/network/config.go @@ -17,8 +17,58 @@ type Config struct { ConntrackRateLimit int `mapstructure:"conntrack_rate_limit"` ProcRoot string `mapstructure:"proc_root"` - ProtocolParser []string `mapstructure:"protocol_parser"` - ProtocolConfigs []ProtocolConfig `mapstructure:"protocol_config,omitempty"` + ProtocolParser []string `mapstructure:"protocol_parser"` + ProtocolConfigs []ProtocolConfig `mapstructure:"protocol_config,omitempty"` + UrlClusteringMethod string `mapstructure:"url_clustering_method"` +} + +func NewDefaultConfig() *Config { + return &Config{ + ConnectTimeout: 100, + RequestTimeout: 60, + ResponseSlowThreshold: 500, + EnableConntrack: true, + ConntrackMaxStateSize: 131072, + ConntrackRateLimit: 500, + ProcRoot: "/proc", + ProtocolParser: []string{"http", "mysql", "dns", "redis", "kafka", "dubbo"}, + ProtocolConfigs: []ProtocolConfig{ + { + Key: "http", + PayloadLength: 200, + }, + { + Key: "dubbo", + PayloadLength: 200, + }, + { + Key: "mysql", + Ports: []uint32{3306}, + Threshold: 100, + }, + { + Key: "kafka", + Ports: []uint32{9092}, + Threshold: 100, + }, + { + Key: "dns", + Ports: []uint32{53}, + Threshold: 100, + }, + { + Key: "cassandra", + Ports: []uint32{9042}, + Threshold: 100, + }, + { + Key: "s3", + Ports: []uint32{9190}, + Threshold: 100, + }, + }, + UrlClusteringMethod: "alphabet", + } } type ProtocolConfig struct { diff --git a/collector/analyzer/network/network_analyzer.go b/collector/analyzer/network/network_analyzer.go index f86a397c9..a72a9b261 100644 --- a/collector/analyzer/network/network_analyzer.go +++ b/collector/analyzer/network/network_analyzer.go @@ -39,6 +39,7 @@ type NetworkAnalyzer struct { staticPortMap map[uint32]string slowThresholdMap map[string]int protocolMap map[string]*protocol.ProtocolParser + parserFactory *factory.ParserFactory parsers []*protocol.ProtocolParser dataGroupPool *DataGroupPool @@ -68,6 +69,7 @@ func NewNetworkAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, co na.conntracker, _ = conntracker.NewConntracker(connConfig) } + na.parserFactory = factory.NewParserFactory(factory.WithUrlClusteringMethod(na.cfg.UrlClusteringMethod)) return na } @@ -108,7 +110,7 @@ func (na *NetworkAnalyzer) Start() error { na.protocolMap = map[string]*protocol.ProtocolParser{} parsers := make([]*protocol.ProtocolParser, 0) for _, protocol := range na.cfg.ProtocolParser { - protocolparser := factory.GetParser(protocol) + protocolparser := na.parserFactory.GetParser(protocol) if protocolparser != nil { na.protocolMap[protocol] = protocolparser disableDisern, ok := disableDisernProtocols[protocol] @@ -118,7 +120,7 @@ func (na *NetworkAnalyzer) Start() error { } } // Add Generic Last - parsers = append(parsers, factory.GetGenericParser()) + parsers = append(parsers, na.parserFactory.GetGenericParser()) na.parsers = parsers rand.Seed(time.Now().UnixNano()) @@ -360,7 +362,7 @@ func (na *NetworkAnalyzer) parseProtocols(mps *messagePairs) []*model.DataGroup // Step2 Cache protocol and port // TODO There is concurrent modify case when looping. Considering threadsafe. - cacheParsers, ok := factory.GetCachedParsersByPort(port) + cacheParsers, ok := na.parserFactory.GetCachedParsersByPort(port) if ok { for _, parser := range cacheParsers { records := na.parseProtocol(mps, parser) @@ -369,7 +371,7 @@ func (na *NetworkAnalyzer) parseProtocols(mps *messagePairs) []*model.DataGroup // Reset mapping for generic and port when exceed threshold so as to parsed by other protcols. if parser.AddPortCount(port) == CACHE_RESET_THRESHOLD { parser.ResetPort(port) - factory.RemoveCachedParser(port, parser) + na.parserFactory.RemoveCachedParser(port, parser) } } return records @@ -383,7 +385,7 @@ func (na *NetworkAnalyzer) parseProtocols(mps *messagePairs) []*model.DataGroup if records != nil { // Add mapping for port and protocol when exceed threshold if parser.AddPortCount(port) == CACHE_ADD_THRESHOLD { - factory.AddCachedParser(port, parser) + na.parserFactory.AddCachedParser(port, parser) } return records } diff --git a/collector/analyzer/network/protocol/factory/config.go b/collector/analyzer/network/protocol/factory/config.go new file mode 100644 index 000000000..4998de3b0 --- /dev/null +++ b/collector/analyzer/network/protocol/factory/config.go @@ -0,0 +1,19 @@ +package factory + +type config struct { + urlClusteringMethod string +} + +func newDefaultConfig() *config { + return &config{ + urlClusteringMethod: "alphabet", + } +} + +type Option func(cfg *config) + +func WithUrlClusteringMethod(urlClusteringMethod string) Option { + return func(cfg *config) { + cfg.urlClusteringMethod = urlClusteringMethod + } +} diff --git a/collector/analyzer/network/protocol/factory/factory.go b/collector/analyzer/network/protocol/factory/factory.go index 4725bc99a..71648218a 100644 --- a/collector/analyzer/network/protocol/factory/factory.go +++ b/collector/analyzer/network/protocol/factory/factory.go @@ -13,56 +13,56 @@ import ( "github.com/Kindling-project/kindling/collector/analyzer/network/protocol/redis" ) -var ( - cache_port_parsers_map = make(map[uint32][]*protocol.ProtocolParser) - mutex = sync.Mutex{} +type ParserFactory struct { + cachePortParsersMap map[uint32][]*protocol.ProtocolParser + mutex sync.Mutex + protocolParsers map[string]*protocol.ProtocolParser - generic_parser *protocol.ProtocolParser = generic.NewGenericParser() - http_parser *protocol.ProtocolParser = http.NewHttpParser() - kafka_parser *protocol.ProtocolParser = kafka.NewKafkaParser() - mysql_parser *protocol.ProtocolParser = mysql.NewMysqlParser() - redis_parser *protocol.ProtocolParser = redis.NewRedisParser() - dubbo_parser *protocol.ProtocolParser = dubbo.NewDubboParser() - dns_parser *protocol.ProtocolParser = dns.NewDnsParser() -) + config *config +} -func GetParser(key string) *protocol.ProtocolParser { - switch key { - case protocol.HTTP: - return http_parser - case protocol.KAFKA: - return kafka_parser - case protocol.MYSQL: - return mysql_parser - case protocol.REDIS: - return redis_parser - case protocol.DUBBO: - return dubbo_parser - case protocol.DNS: - return dns_parser - default: - return nil +func NewParserFactory(options ...Option) *ParserFactory { + factory := &ParserFactory{ + cachePortParsersMap: make(map[uint32][]*protocol.ProtocolParser), + protocolParsers: make(map[string]*protocol.ProtocolParser), + config: newDefaultConfig(), } + for _, option := range options { + option(factory.config) + } + factory.protocolParsers[protocol.HTTP] = http.NewHttpParser(factory.config.urlClusteringMethod) + factory.protocolParsers[protocol.KAFKA] = kafka.NewKafkaParser() + factory.protocolParsers[protocol.MYSQL] = mysql.NewMysqlParser() + factory.protocolParsers[protocol.REDIS] = redis.NewRedisParser() + factory.protocolParsers[protocol.DUBBO] = dubbo.NewDubboParser() + factory.protocolParsers[protocol.DNS] = dns.NewDnsParser() + factory.protocolParsers[protocol.NOSUPPORT] = generic.NewGenericParser() + + return factory } -func GetGenericParser() *protocol.ProtocolParser { - return generic_parser +func (f *ParserFactory) GetParser(key string) *protocol.ProtocolParser { + return f.protocolParsers[key] } -func GetCachedParsersByPort(port uint32) ([]*protocol.ProtocolParser, bool) { - mutex.Lock() - parser, ok := cache_port_parsers_map[port] - mutex.Unlock() +func (f *ParserFactory) GetGenericParser() *protocol.ProtocolParser { + return f.protocolParsers[protocol.NOSUPPORT] +} + +func (f *ParserFactory) GetCachedParsersByPort(port uint32) ([]*protocol.ProtocolParser, bool) { + f.mutex.Lock() + parser, ok := f.cachePortParsersMap[port] + f.mutex.Unlock() return parser, ok } -func AddCachedParser(port uint32, parser *protocol.ProtocolParser) { - mutex.Lock() - if val := cache_port_parsers_map[port]; val == nil { +func (f *ParserFactory) AddCachedParser(port uint32, parser *protocol.ProtocolParser) { + f.mutex.Lock() + if val := f.cachePortParsersMap[port]; val == nil { parsers := make([]*protocol.ProtocolParser, 0) parsers = append(parsers, parser) - cache_port_parsers_map[port] = parsers + f.cachePortParsersMap[port] = parsers } else { exist := false for _, value := range val { @@ -71,31 +71,31 @@ func AddCachedParser(port uint32, parser *protocol.ProtocolParser) { break } } - + genericParser := f.GetGenericParser() if !exist { // Make sure Generic is last - if len(val) > 0 && val[len(val)-1] == generic_parser { + if len(val) > 0 && val[len(val)-1] == genericParser { parsers := append(val[0:len(val)-1], parser) - parsers = append(parsers, generic_parser) - cache_port_parsers_map[port] = parsers + parsers = append(parsers, genericParser) + f.cachePortParsersMap[port] = parsers } else { parsers := append(val, parser) - cache_port_parsers_map[port] = parsers + f.cachePortParsersMap[port] = parsers } } } - mutex.Unlock() + f.mutex.Unlock() } -func RemoveCachedParser(port uint32, parser *protocol.ProtocolParser) { - mutex.Lock() - if val, ok := cache_port_parsers_map[port]; ok { +func (f *ParserFactory) RemoveCachedParser(port uint32, parser *protocol.ProtocolParser) { + f.mutex.Lock() + if val, ok := f.cachePortParsersMap[port]; ok { for i, value := range val { if value == parser { val = append(val[:i], val[i+1:]...) - cache_port_parsers_map[port] = val + f.cachePortParsersMap[port] = val } } } - mutex.Unlock() + f.mutex.Unlock() } diff --git a/collector/analyzer/network/protocol/http/http_parser.go b/collector/analyzer/network/protocol/http/http_parser.go index a9110ab0c..b715a738a 100644 --- a/collector/analyzer/network/protocol/http/http_parser.go +++ b/collector/analyzer/network/protocol/http/http_parser.go @@ -4,10 +4,20 @@ import ( "strings" "github.com/Kindling-project/kindling/collector/analyzer/network/protocol" + "github.com/Kindling-project/kindling/collector/pkg/urlclustering" ) -func NewHttpParser() *protocol.ProtocolParser { - requestParser := protocol.CreatePkgParser(fastfailHttpRequest(), parseHttpRequest()) +func NewHttpParser(urlClusteringMethod string) *protocol.ProtocolParser { + var method urlclustering.ClusteringMethod + switch urlClusteringMethod { + case "alphabet": + method = urlclustering.NewAlphabeticalClusteringMethod() + case "noparam": + method = urlclustering.NewNoParamClusteringMethod() + default: + method = urlclustering.NewAlphabeticalClusteringMethod() + } + requestParser := protocol.CreatePkgParser(fastfailHttpRequest(), parseHttpRequest(method)) responseParser := protocol.CreatePkgParser(fastfailHttpResponse(), parseHttpResponse()) return protocol.NewProtocolParser(protocol.HTTP, requestParser, responseParser, nil) diff --git a/collector/analyzer/network/protocol/http/http_parser_test.go b/collector/analyzer/network/protocol/http/http_parser_test.go index 917689ea3..3c0a8884a 100644 --- a/collector/analyzer/network/protocol/http/http_parser_test.go +++ b/collector/analyzer/network/protocol/http/http_parser_test.go @@ -98,7 +98,7 @@ func TestParseHttpRequest_GetPayLoad(t *testing.T) { t.Run(tt.name, func(t *testing.T) { protocol.SetPayLoadLength(protocol.HTTP, tt.size) message := protocol.NewRequestMessage([]byte(httpData)) - NewHttpParser().ParseRequest(message) + NewHttpParser("").ParseRequest(message) if !message.HasAttribute(constlabels.HttpRequestPayload) { t.Errorf("Fail to parse HttpRequest()") @@ -127,7 +127,7 @@ func TestParseHttpResponse_GetPayLoad(t *testing.T) { protocol.SetPayLoadLength(protocol.HTTP, tt.size) message := protocol.NewResponseMessage([]byte(httpData), model.NewAttributeMap()) - NewHttpParser().ParseResponse(message) + NewHttpParser("").ParseResponse(message) if !message.HasAttribute(constlabels.HttpResponsePayload) { t.Errorf("Fail to parse HttpResponse()") diff --git a/collector/analyzer/network/protocol/http/http_request.go b/collector/analyzer/network/protocol/http/http_request.go index e06bff33b..4e35aee42 100644 --- a/collector/analyzer/network/protocol/http/http_request.go +++ b/collector/analyzer/network/protocol/http/http_request.go @@ -27,7 +27,7 @@ Request line Request header Request body */ -func parseHttpRequest() protocol.ParsePkgFn { +func parseHttpRequest(urlClusteringMethod urlclustering.ClusteringMethod) protocol.ParsePkgFn { return func(message *protocol.PayloadMessage) (bool, bool) { offset, method := message.ReadUntilBlankWithLength(message.Offset, 8) @@ -56,7 +56,7 @@ func parseHttpRequest() protocol.ParsePkgFn { message.AddByteArrayUtf8Attribute(constlabels.HttpUrl, url) message.AddByteArrayUtf8Attribute(constlabels.HttpRequestPayload, message.GetData(0, protocol.GetHttpPayLoadLength())) - contentKey := urlclustering.AlphabeticClustering(string(url)) + contentKey := urlClusteringMethod.Clustering(string(url)) if len(contentKey) == 0 { contentKey = "*" } diff --git a/collector/docker/kindling-collector-config.yml b/collector/docker/kindling-collector-config.yml index 5e7abd35b..b9218f584 100644 --- a/collector/docker/kindling-collector-config.yml +++ b/collector/docker/kindling-collector-config.yml @@ -49,6 +49,12 @@ analyzers: # The protocol parsers which is enabled # When dissectors are enabled, agent will analyze the payload and enrich metric/trace with its content. protocol_parser: [ http, mysql, dns, redis, kafka, dubbo ] + # Which URL clustering method should be used. + # Valid values: ["noparam", "alphabet"] + # - noparam: Only trim the trailing parameters behind the character '?' + # - alphabet: Trim the trailing parameters and Convert the segments + # containing non-alphabetical characters to star(*) + url_clustering_method: alphabet # If the destination port of data is one of the followings, the protocol of such network request # is set to the corresponding one. Note the program will try to identify the protocol automatically # for the ports that are not in the lists, in which case the cpu usage will be increased much inevitably. diff --git a/deploy/agent/kindling-collector-config.yml b/deploy/agent/kindling-collector-config.yml index b9fc25555..c35657733 100644 --- a/deploy/agent/kindling-collector-config.yml +++ b/deploy/agent/kindling-collector-config.yml @@ -49,6 +49,12 @@ analyzers: # The protocol parsers which is enabled # When dissectors are enabled, agent will analyze the payload and enrich metric/trace with its content. protocol_parser: [ http, mysql, dns, redis, kafka, dubbo ] + # Which URL clustering method should be used. + # Valid values: ["noparam", "alphabet"] + # - noparam: Only trim the trailing parameters behind the character '?' + # - alphabet: Trim the trailing parameters and Convert the segments + # containing non-alphabetical characters to star(*) + url_clustering_method: alphabet # If the destination port of data is one of the followings, the protocol of such network request # is set to the corresponding one. Note the program will try to identify the protocol automatically # for the ports that are not in the lists, in which case the cpu usage will be increased much inevitably. From 9a216ffac5a3e12238f7f1c5b23aedabf59c108e Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Fri, 24 Jun 2022 16:20:10 +0800 Subject: [PATCH 5/6] improve the comments of the new option Signed-off-by: Daxin Wang --- collector/docker/kindling-collector-config.yml | 3 ++- deploy/agent/kindling-collector-config.yml | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/collector/docker/kindling-collector-config.yml b/collector/docker/kindling-collector-config.yml index b9218f584..2192f3433 100644 --- a/collector/docker/kindling-collector-config.yml +++ b/collector/docker/kindling-collector-config.yml @@ -49,7 +49,8 @@ analyzers: # The protocol parsers which is enabled # When dissectors are enabled, agent will analyze the payload and enrich metric/trace with its content. protocol_parser: [ http, mysql, dns, redis, kafka, dubbo ] - # Which URL clustering method should be used. + # Which URL clustering method should be used to shorten the URL of HTTP request. + # This is useful for decrease the cardinality of URLs. # Valid values: ["noparam", "alphabet"] # - noparam: Only trim the trailing parameters behind the character '?' # - alphabet: Trim the trailing parameters and Convert the segments diff --git a/deploy/agent/kindling-collector-config.yml b/deploy/agent/kindling-collector-config.yml index c35657733..ac9f6a106 100644 --- a/deploy/agent/kindling-collector-config.yml +++ b/deploy/agent/kindling-collector-config.yml @@ -49,7 +49,8 @@ analyzers: # The protocol parsers which is enabled # When dissectors are enabled, agent will analyze the payload and enrich metric/trace with its content. protocol_parser: [ http, mysql, dns, redis, kafka, dubbo ] - # Which URL clustering method should be used. + # Which URL clustering method should be used to shorten the URL of HTTP request. + # This is useful for decrease the cardinality of URLs. # Valid values: ["noparam", "alphabet"] # - noparam: Only trim the trailing parameters behind the character '?' # - alphabet: Trim the trailing parameters and Convert the segments From 628d63f9cf264381d2e6b17c2172a7939e2ce079 Mon Sep 17 00:00:00 2001 From: Daxin Wang Date: Fri, 24 Jun 2022 17:31:55 +0800 Subject: [PATCH 6/6] convert the segments that are longer than 25 to stars Signed-off-by: Daxin Wang --- collector/pkg/urlclustering/alphabet.go | 21 ++++++++++++++++---- collector/pkg/urlclustering/alphabet_test.go | 10 +++++++--- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/collector/pkg/urlclustering/alphabet.go b/collector/pkg/urlclustering/alphabet.go index d1922513d..5ff967112 100644 --- a/collector/pkg/urlclustering/alphabet.go +++ b/collector/pkg/urlclustering/alphabet.go @@ -18,6 +18,8 @@ func NewAlphabeticalClusteringMethod() *AlphabeticClusteringMethod { } } +// ClusteringBaseline is a more readable version of Clustering() but with a poor performance. +// Don't use this function at anytime. func (m *AlphabeticClusteringMethod) ClusteringBaseline(endpoint string) string { if endpoint == "" { return "" @@ -32,8 +34,12 @@ func (m *AlphabeticClusteringMethod) ClusteringBaseline(endpoint string) string segments := strings.Split(endpoint, "/") // Iterate over all parts and execute the regular expression. resultSegments := make([]string, 0, len(segments)) - // Skip the first segment because it is supposed to be always empty. - for i := 1; i < len(segments); i++ { + for i := 0; i < len(segments); i++ { + // If the current segment is too long, we consider it as a high-cardinality variable. + if len(segments[i]) > 25 { + resultSegments = append(resultSegments, "*") + continue + } if segments[i] == "" || m.regexp.MatchString(segments[i]) { resultSegments = append(resultSegments, segments[i]) } else { @@ -43,8 +49,11 @@ func (m *AlphabeticClusteringMethod) ClusteringBaseline(endpoint string) string } // Re-combine all parts together var resultEndpoint string - for _, seg := range resultSegments { - resultEndpoint = resultEndpoint + "/" + seg + for i, seg := range resultSegments { + resultEndpoint = resultEndpoint + seg + if i != len(resultSegments)-1 { + resultEndpoint += "/" + } } return resultEndpoint @@ -87,6 +96,10 @@ func (m *AlphabeticClusteringMethod) Clustering(endpoint string) string { } if isAlphabetical(b) { currentSegment = append(currentSegment, b) + // If the current segment is too long, we consider it as a high-cardinality variable. + if len(currentSegment) > 25 { + currentSegmentIsStar = true + } } else { currentSegmentIsStar = true } diff --git a/collector/pkg/urlclustering/alphabet_test.go b/collector/pkg/urlclustering/alphabet_test.go index 764650ebe..2fcd40990 100644 --- a/collector/pkg/urlclustering/alphabet_test.go +++ b/collector/pkg/urlclustering/alphabet_test.go @@ -13,13 +13,17 @@ type testCase struct { func newTestcases() []testCase { return []testCase{ + {"", ""}, {"/", "/"}, - {"/A/b/20000/", "/A/b/*/"}, - {" /test/22?v=a", "/test/*"}, - {"/a12?a=1132", "/*"}, + {"/A/b/_20000/", "/A/b/*/"}, + {" /test_a/22?v=a", "/test_a/*"}, + {"/a-12?a=1132", "/*"}, {"/abcd/1234a/efg/b&*", "/abcd/*/efg/*"}, // Double slashes is valid but not recommended {"/a//b/c?d=2&e=3", "/a//b/c"}, + // Although this is not a valid endpoint, we still accept it. + {"noslash/and/1234", "noslash/and/*"}, + {"/a/b/it-is-a-long-segment-like-uuid-or-document-name-or-something-wired?v=1&b=3", "/a/b/*"}, } }