Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a URL clustering method to reduce the cardinality #268

Merged
merged 6 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 52 additions & 2 deletions collector/analyzer/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,58 @@ type Config struct {
ConntrackRateLimit int `mapstructure:"conntrack_rate_limit"`
ProcRoot string `mapstructure:"proc_root"`

ProtocolParser []string `mapstructure:"protocol_parser"`
ProtocolConfigs []ProtocolConfig `mapstructure:"protocol_config,omitempty"`
ProtocolParser []string `mapstructure:"protocol_parser"`
ProtocolConfigs []ProtocolConfig `mapstructure:"protocol_config,omitempty"`
UrlClusteringMethod string `mapstructure:"url_clustering_method"`
}

func NewDefaultConfig() *Config {
return &Config{
ConnectTimeout: 100,
RequestTimeout: 60,
ResponseSlowThreshold: 500,
EnableConntrack: true,
ConntrackMaxStateSize: 131072,
ConntrackRateLimit: 500,
ProcRoot: "/proc",
ProtocolParser: []string{"http", "mysql", "dns", "redis", "kafka", "dubbo"},
ProtocolConfigs: []ProtocolConfig{
{
Key: "http",
PayloadLength: 200,
},
{
Key: "dubbo",
PayloadLength: 200,
},
{
Key: "mysql",
Ports: []uint32{3306},
Threshold: 100,
},
{
Key: "kafka",
Ports: []uint32{9092},
Threshold: 100,
},
{
Key: "dns",
Ports: []uint32{53},
Threshold: 100,
},
{
Key: "cassandra",
Ports: []uint32{9042},
Threshold: 100,
},
{
Key: "s3",
Ports: []uint32{9190},
Threshold: 100,
},
},
UrlClusteringMethod: "alphabet",
}
}

type ProtocolConfig struct {
Expand Down
12 changes: 7 additions & 5 deletions collector/analyzer/network/network_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type NetworkAnalyzer struct {
staticPortMap map[uint32]string
slowThresholdMap map[string]int
protocolMap map[string]*protocol.ProtocolParser
parserFactory *factory.ParserFactory
parsers []*protocol.ProtocolParser

dataGroupPool *DataGroupPool
Expand Down Expand Up @@ -68,6 +69,7 @@ func NewNetworkAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, co
na.conntracker, _ = conntracker.NewConntracker(connConfig)
}

na.parserFactory = factory.NewParserFactory(factory.WithUrlClusteringMethod(na.cfg.UrlClusteringMethod))
return na
}

Expand Down Expand Up @@ -108,7 +110,7 @@ func (na *NetworkAnalyzer) Start() error {
na.protocolMap = map[string]*protocol.ProtocolParser{}
parsers := make([]*protocol.ProtocolParser, 0)
for _, protocol := range na.cfg.ProtocolParser {
protocolparser := factory.GetParser(protocol)
protocolparser := na.parserFactory.GetParser(protocol)
if protocolparser != nil {
na.protocolMap[protocol] = protocolparser
disableDisern, ok := disableDisernProtocols[protocol]
Expand All @@ -118,7 +120,7 @@ func (na *NetworkAnalyzer) Start() error {
}
}
// Add Generic Last
parsers = append(parsers, factory.GetGenericParser())
parsers = append(parsers, na.parserFactory.GetGenericParser())
na.parsers = parsers

rand.Seed(time.Now().UnixNano())
Expand Down Expand Up @@ -360,7 +362,7 @@ func (na *NetworkAnalyzer) parseProtocols(mps *messagePairs) []*model.DataGroup

// Step2 Cache protocol and port
// TODO There is concurrent modify case when looping. Considering threadsafe.
cacheParsers, ok := factory.GetCachedParsersByPort(port)
cacheParsers, ok := na.parserFactory.GetCachedParsersByPort(port)
if ok {
for _, parser := range cacheParsers {
records := na.parseProtocol(mps, parser)
Expand All @@ -369,7 +371,7 @@ func (na *NetworkAnalyzer) parseProtocols(mps *messagePairs) []*model.DataGroup
// Reset mapping for generic and port when exceed threshold so as to parsed by other protcols.
if parser.AddPortCount(port) == CACHE_RESET_THRESHOLD {
parser.ResetPort(port)
factory.RemoveCachedParser(port, parser)
na.parserFactory.RemoveCachedParser(port, parser)
}
}
return records
Expand All @@ -383,7 +385,7 @@ func (na *NetworkAnalyzer) parseProtocols(mps *messagePairs) []*model.DataGroup
if records != nil {
// Add mapping for port and protocol when exceed threshold
if parser.AddPortCount(port) == CACHE_ADD_THRESHOLD {
factory.AddCachedParser(port, parser)
na.parserFactory.AddCachedParser(port, parser)
}
return records
}
Expand Down
19 changes: 19 additions & 0 deletions collector/analyzer/network/protocol/factory/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package factory

type config struct {
urlClusteringMethod string
}

func newDefaultConfig() *config {
return &config{
urlClusteringMethod: "alphabet",
}
}

type Option func(cfg *config)

func WithUrlClusteringMethod(urlClusteringMethod string) Option {
return func(cfg *config) {
cfg.urlClusteringMethod = urlClusteringMethod
}
}
96 changes: 48 additions & 48 deletions collector/analyzer/network/protocol/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,56 +13,56 @@ import (
"github.com/Kindling-project/kindling/collector/analyzer/network/protocol/redis"
)

var (
cache_port_parsers_map = make(map[uint32][]*protocol.ProtocolParser)
mutex = sync.Mutex{}
type ParserFactory struct {
cachePortParsersMap map[uint32][]*protocol.ProtocolParser
mutex sync.Mutex
protocolParsers map[string]*protocol.ProtocolParser

generic_parser *protocol.ProtocolParser = generic.NewGenericParser()
http_parser *protocol.ProtocolParser = http.NewHttpParser()
kafka_parser *protocol.ProtocolParser = kafka.NewKafkaParser()
mysql_parser *protocol.ProtocolParser = mysql.NewMysqlParser()
redis_parser *protocol.ProtocolParser = redis.NewRedisParser()
dubbo_parser *protocol.ProtocolParser = dubbo.NewDubboParser()
dns_parser *protocol.ProtocolParser = dns.NewDnsParser()
)
config *config
}

func GetParser(key string) *protocol.ProtocolParser {
switch key {
case protocol.HTTP:
return http_parser
case protocol.KAFKA:
return kafka_parser
case protocol.MYSQL:
return mysql_parser
case protocol.REDIS:
return redis_parser
case protocol.DUBBO:
return dubbo_parser
case protocol.DNS:
return dns_parser
default:
return nil
func NewParserFactory(options ...Option) *ParserFactory {
factory := &ParserFactory{
cachePortParsersMap: make(map[uint32][]*protocol.ProtocolParser),
protocolParsers: make(map[string]*protocol.ProtocolParser),
config: newDefaultConfig(),
}
for _, option := range options {
option(factory.config)
}
factory.protocolParsers[protocol.HTTP] = http.NewHttpParser(factory.config.urlClusteringMethod)
factory.protocolParsers[protocol.KAFKA] = kafka.NewKafkaParser()
factory.protocolParsers[protocol.MYSQL] = mysql.NewMysqlParser()
factory.protocolParsers[protocol.REDIS] = redis.NewRedisParser()
factory.protocolParsers[protocol.DUBBO] = dubbo.NewDubboParser()
factory.protocolParsers[protocol.DNS] = dns.NewDnsParser()
factory.protocolParsers[protocol.NOSUPPORT] = generic.NewGenericParser()

return factory
}

func GetGenericParser() *protocol.ProtocolParser {
return generic_parser
func (f *ParserFactory) GetParser(key string) *protocol.ProtocolParser {
return f.protocolParsers[key]
}

func GetCachedParsersByPort(port uint32) ([]*protocol.ProtocolParser, bool) {
mutex.Lock()
parser, ok := cache_port_parsers_map[port]
mutex.Unlock()
func (f *ParserFactory) GetGenericParser() *protocol.ProtocolParser {
return f.protocolParsers[protocol.NOSUPPORT]
}

func (f *ParserFactory) GetCachedParsersByPort(port uint32) ([]*protocol.ProtocolParser, bool) {
f.mutex.Lock()
parser, ok := f.cachePortParsersMap[port]
f.mutex.Unlock()

return parser, ok
}

func AddCachedParser(port uint32, parser *protocol.ProtocolParser) {
mutex.Lock()
if val := cache_port_parsers_map[port]; val == nil {
func (f *ParserFactory) AddCachedParser(port uint32, parser *protocol.ProtocolParser) {
f.mutex.Lock()
if val := f.cachePortParsersMap[port]; val == nil {
parsers := make([]*protocol.ProtocolParser, 0)
parsers = append(parsers, parser)
cache_port_parsers_map[port] = parsers
f.cachePortParsersMap[port] = parsers
} else {
exist := false
for _, value := range val {
Expand All @@ -71,31 +71,31 @@ func AddCachedParser(port uint32, parser *protocol.ProtocolParser) {
break
}
}

genericParser := f.GetGenericParser()
if !exist {
// Make sure Generic is last
if len(val) > 0 && val[len(val)-1] == generic_parser {
if len(val) > 0 && val[len(val)-1] == genericParser {
parsers := append(val[0:len(val)-1], parser)
parsers = append(parsers, generic_parser)
cache_port_parsers_map[port] = parsers
parsers = append(parsers, genericParser)
f.cachePortParsersMap[port] = parsers
} else {
parsers := append(val, parser)
cache_port_parsers_map[port] = parsers
f.cachePortParsersMap[port] = parsers
}
}
}
mutex.Unlock()
f.mutex.Unlock()
}

func RemoveCachedParser(port uint32, parser *protocol.ProtocolParser) {
mutex.Lock()
if val, ok := cache_port_parsers_map[port]; ok {
func (f *ParserFactory) RemoveCachedParser(port uint32, parser *protocol.ProtocolParser) {
f.mutex.Lock()
if val, ok := f.cachePortParsersMap[port]; ok {
for i, value := range val {
if value == parser {
val = append(val[:i], val[i+1:]...)
cache_port_parsers_map[port] = val
f.cachePortParsersMap[port] = val
}
}
}
mutex.Unlock()
f.mutex.Unlock()
}
14 changes: 12 additions & 2 deletions collector/analyzer/network/protocol/http/http_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,20 @@ import (
"strings"

"github.com/Kindling-project/kindling/collector/analyzer/network/protocol"
"github.com/Kindling-project/kindling/collector/pkg/urlclustering"
)

func NewHttpParser() *protocol.ProtocolParser {
requestParser := protocol.CreatePkgParser(fastfailHttpRequest(), parseHttpRequest())
func NewHttpParser(urlClusteringMethod string) *protocol.ProtocolParser {
var method urlclustering.ClusteringMethod
switch urlClusteringMethod {
case "alphabet":
method = urlclustering.NewAlphabeticalClusteringMethod()
case "noparam":
method = urlclustering.NewNoParamClusteringMethod()
default:
method = urlclustering.NewAlphabeticalClusteringMethod()
}
requestParser := protocol.CreatePkgParser(fastfailHttpRequest(), parseHttpRequest(method))
responseParser := protocol.CreatePkgParser(fastfailHttpResponse(), parseHttpResponse())

return protocol.NewProtocolParser(protocol.HTTP, requestParser, responseParser, nil)
Expand Down
4 changes: 2 additions & 2 deletions collector/analyzer/network/protocol/http/http_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestParseHttpRequest_GetPayLoad(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
protocol.SetPayLoadLength(protocol.HTTP, tt.size)
message := protocol.NewRequestMessage([]byte(httpData))
NewHttpParser().ParseRequest(message)
NewHttpParser("").ParseRequest(message)

if !message.HasAttribute(constlabels.HttpRequestPayload) {
t.Errorf("Fail to parse HttpRequest()")
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestParseHttpResponse_GetPayLoad(t *testing.T) {
protocol.SetPayLoadLength(protocol.HTTP, tt.size)

message := protocol.NewResponseMessage([]byte(httpData), model.NewAttributeMap())
NewHttpParser().ParseResponse(message)
NewHttpParser("").ParseResponse(message)

if !message.HasAttribute(constlabels.HttpResponsePayload) {
t.Errorf("Fail to parse HttpResponse()")
Expand Down
5 changes: 3 additions & 2 deletions collector/analyzer/network/protocol/http/http_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"strings"

"github.com/Kindling-project/kindling/collector/analyzer/tools"
"github.com/Kindling-project/kindling/collector/pkg/urlclustering"

"github.com/Kindling-project/kindling/collector/analyzer/network/protocol"
"github.com/Kindling-project/kindling/collector/model/constlabels"
Expand All @@ -26,7 +27,7 @@ Request line
Request header
Request body
*/
func parseHttpRequest() protocol.ParsePkgFn {
func parseHttpRequest(urlClusteringMethod urlclustering.ClusteringMethod) protocol.ParsePkgFn {
return func(message *protocol.PayloadMessage) (bool, bool) {
offset, method := message.ReadUntilBlankWithLength(message.Offset, 8)

Expand Down Expand Up @@ -55,7 +56,7 @@ func parseHttpRequest() protocol.ParsePkgFn {
message.AddByteArrayUtf8Attribute(constlabels.HttpUrl, url)
message.AddByteArrayUtf8Attribute(constlabels.HttpRequestPayload, message.GetData(0, protocol.GetHttpPayLoadLength()))

contentKey := getContentKey(string(url))
contentKey := urlClusteringMethod.Clustering(string(url))
if len(contentKey) == 0 {
contentKey = "*"
}
Expand Down
7 changes: 7 additions & 0 deletions collector/docker/kindling-collector-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ analyzers:
# The protocol parsers which is enabled
# When dissectors are enabled, agent will analyze the payload and enrich metric/trace with its content.
protocol_parser: [ http, mysql, dns, redis, kafka, dubbo ]
# Which URL clustering method should be used to shorten the URL of HTTP request.
# This is useful for decrease the cardinality of URLs.
# Valid values: ["noparam", "alphabet"]
# - noparam: Only trim the trailing parameters behind the character '?'
# - alphabet: Trim the trailing parameters and Convert the segments
# containing non-alphabetical characters to star(*)
url_clustering_method: alphabet
# If the destination port of data is one of the followings, the protocol of such network request
# is set to the corresponding one. Note the program will try to identify the protocol automatically
# for the ports that are not in the lists, in which case the cpu usage will be increased much inevitably.
Expand Down
Loading