Skip to content

Commit

Permalink
Support Protocol Dubbo2 (#184)
Browse files Browse the repository at this point in the history
* Support Protocol Dubbo2

The changes of config.yaml
1. Remove http_payload_length
2. Add payload_length in protocol_config for all protocols
3. Add dubbo in protocol_parser

Dubbo Request Output:
1. content_key
2. request_payload

Dubbo Response Output:
1. dubbo_error_code
2. response_payload
3. is_error
4. error_type

Signed-off-by: huxiangyuan <huxiangyuan@harmonycloud.cn>

* feat(net-adapter): support to exporter dubbo metric info

Signed-off-by: niejiangang <niejiangang@harmonycloud.cn>

* test: add testcase of label_converter for dubbo

Signed-off-by: niejiangang <niejiangang@harmonycloud.cn>

* docs: update prometheus_metrics about dubbo info

Signed-off-by: niejiangang <niejiangang@harmonycloud.cn>

* docs: update prometheus metrics document

Signed-off-by: niejiangang <niejiangang@harmonycloud.cn>

* docs: update prometheus metrics document

Signed-off-by: niejiangang <niejiangang@harmonycloud.cn>

* test: update testcase for code change

Signed-off-by: niejiangang <niejiangang@harmonycloud.cn>

* style(dubbo-parser): Using camel case instead of snake case.

Signed-off-by: niejiangang <niejiangang@harmonycloud.cn>

* docs: add comments on payload_length

Signed-off-by: niejiangang <niejiangang@harmonycloud.cn>

* docs: add comments on payload_length

Signed-off-by: niejiangang <niejiangang@harmonycloud.cn>

* refactor(procotol): Set default protocol length to 80 and extract a method `GetPayloadLength(protocol string)`

Signed-off-by: niejiangang <niejiangang@harmonycloud.cn>

Co-authored-by: niejiangang <niejiangang@harmonycloud.cn>
Co-authored-by: Daxin Wang <daxinwang@harmonycloud.cn>
  • Loading branch information
3 people authored May 12, 2022
1 parent 8a2b600 commit 1a2ead6
Show file tree
Hide file tree
Showing 24 changed files with 713 additions and 47 deletions.
15 changes: 3 additions & 12 deletions collector/analyzer/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ const (
defaultRequestTimeout = 1
defaultConnectTimeout = 1
defaultResponseSlowThreshold = 500
defaultHttpPayloadLength = 200
)

type Config struct {
Expand All @@ -18,14 +17,14 @@ type Config struct {
ConntrackRateLimit int `mapstructure:"conntrack_rate_limit"`
ProcRoot string `mapstructure:"proc_root"`

ProtocolParser []string `mapstructure:"protocol_parser"`
HttpPayloadLength int `mapstructure:"http_payload_length"`
ProtocolConfigs []ProtocolConfig `mapstructure:"protocol_config,omitempty"`
ProtocolParser []string `mapstructure:"protocol_parser"`
ProtocolConfigs []ProtocolConfig `mapstructure:"protocol_config,omitempty"`
}

type ProtocolConfig struct {
Key string `mapstructure:"key,omitempty"`
Ports []uint32 `mapstructure:"ports,omitempty"`
PayloadLength int `mapstructure:"payload_length"`
DisableDiscern bool `mapstructure:"disable_discern,omitempty"`
Threshold int `mapstructure:"slow_threshold,omitempty"`
}
Expand Down Expand Up @@ -53,11 +52,3 @@ func (cfg *Config) getResponseSlowThreshold() int {
return defaultResponseSlowThreshold
}
}

func (cfg *Config) getHttpPayloadLength() int {
if cfg.HttpPayloadLength > 0 {
return cfg.HttpPayloadLength
} else {
return defaultHttpPayloadLength
}
}
5 changes: 3 additions & 2 deletions collector/analyzer/network/network_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func (na *NetworkAnalyzer) Start() error {

go na.consumerFdNoReusingTrace()

protocol.SetHttpPayLoadLength(na.cfg.getHttpPayloadLength())
na.staticPortMap = map[uint32]string{}
for _, config := range na.cfg.ProtocolConfigs {
for _, port := range config.Ports {
Expand All @@ -87,6 +86,7 @@ func (na *NetworkAnalyzer) Start() error {
na.slowThresholdMap = map[string]int{}
disableDisernProtocols := map[string]bool{}
for _, config := range na.cfg.ProtocolConfigs {
protocol.SetPayLoadLength(config.Key, config.PayloadLength)
na.slowThresholdMap[config.Key] = config.Threshold
disableDisernProtocols[config.Key] = config.DisableDiscern
}
Expand Down Expand Up @@ -591,7 +591,8 @@ func (na *NetworkAnalyzer) isSlow(duration uint64, protocol string) bool {
}

func (na *NetworkAnalyzer) getResponseSlowThreshold(protocol string) int {
if value, ok := na.slowThresholdMap[protocol]; ok {
if value, ok := na.slowThresholdMap[protocol]; ok && value > 0 {
// If value is not set, use response_slow_threshold by default.
return value
}
return na.cfg.getResponseSlowThreshold()
Expand Down
6 changes: 6 additions & 0 deletions collector/analyzer/network/network_analyzer_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
BENCH_CASE_DNS = "dns"
BENCH_CASE_KAFKA_PRODUCER = "kafka_producer"
BENCH_CASE_KAFKA_FETCHER = "kafka_fetcher"
BENCH_CASE_DUBBO = "dubbo"
)

var benchCaseMap = map[string]benchCase{
Expand All @@ -29,6 +30,7 @@ var benchCaseMap = map[string]benchCase{
BENCH_CASE_DNS: {protocol.DNS, "dns/server-event.yml", "dns/1k-trace.yml"},
BENCH_CASE_KAFKA_PRODUCER: {protocol.KAFKA, "kafka/provider-event.yml", "kafka/1k-provider-trace.yml"},
BENCH_CASE_KAFKA_FETCHER: {protocol.KAFKA, "kafka/consumer-event.yml", "kafka/1k-consumer-trace.yml"},
BENCH_CASE_DUBBO: {protocol.DUBBO, "dubbo/server-event.yml", "dubbo/1k-trace.yml"},
}

const (
Expand Down Expand Up @@ -59,6 +61,10 @@ func BenchmarkKafkaFetcher(b *testing.B) {
testProtocolBench(b, b.N, SIZE_MESSAGE_PAIR, BENCH_CASE_KAFKA_FETCHER)
}

func BenchmarkDubo(b *testing.B) {
testProtocolBench(b, b.N, SIZE_MESSAGE_PAIR, BENCH_CASE_DUBBO)
}

func testProtocolBench(b *testing.B, tps int, mpSize int, caseKey string) {
na := prepareNetworkAnalyzer()
if na == nil {
Expand Down
5 changes: 5 additions & 0 deletions collector/analyzer/network/network_analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ func TestKafkaProtocol(t *testing.T) {
"kafka/consumer-trace-fetch-multi-topics.yml")
}

func TestDubboProtocol(t *testing.T) {
testProtocol(t, "dubbo/server-event.yml",
"dubbo/server-trace-short.yml")
}

type NopProcessor struct {
}

Expand Down
48 changes: 48 additions & 0 deletions collector/analyzer/network/protocol/dubbo/dubbo_parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package dubbo

import "github.com/Kindling-project/kindling/collector/analyzer/network/protocol"

const (
// Zero : byte zero
Zero = byte(0x00)

// magic header
MagicHigh = byte(0xda)
MagicLow = byte(0xbb)

// message flag.
FlagRequest = byte(0x80)
FlagTwoWay = byte(0x40)
FlagEvent = byte(0x20) // for heartbeat
SerialMask = 0x1f

AsciiLow = byte(0x20)
AsciiHigh = byte(0x7e)
AsciiReplace = byte(0x2e) // .
)

func NewDubboParser() *protocol.ProtocolParser {
requestParser := protocol.CreatePkgParser(fastfailDubboRequest(), parseDubboRequest())
responseParser := protocol.CreatePkgParser(fastfailDubboResponse(), parseDubboResponse())
return protocol.NewProtocolParser(protocol.DUBBO, requestParser, responseParser, nil)
}

/**
Get the ascii readable string, replace other value to '.', like wireshark.
*/
func getAsciiString(data []byte) string {
length := len(data)
if length == 0 {
return ""
}

newData := make([]byte, length)
for i := 0; i < length; i++ {
if data[i] > AsciiHigh || data[i] < AsciiLow {
newData[i] = AsciiReplace
} else {
newData[i] = data[i]
}
}
return string(newData)
}
67 changes: 67 additions & 0 deletions collector/analyzer/network/protocol/dubbo/dubbo_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package dubbo

import (
"github.com/Kindling-project/kindling/collector/analyzer/network/protocol"
"github.com/Kindling-project/kindling/collector/model/constlabels"
)

func fastfailDubboRequest() protocol.FastFailFn {
return func(message *protocol.PayloadMessage) bool {
return len(message.Data) < 16 || message.Data[0] != MagicHigh || message.Data[1] != MagicLow
}
}

func parseDubboRequest() protocol.ParsePkgFn {
return func(message *protocol.PayloadMessage) (bool, bool) {
contentKey := getContentKey(message.Data)
if contentKey == "" {
return false, true
}

message.AddStringAttribute(constlabels.ContentKey, contentKey)
message.AddStringAttribute(constlabels.DubboRequestPayload, getAsciiString(message.GetData(16, protocol.GetDubboPayLoadLength())))
return true, true
}
}

func getContentKey(requestData []byte) string {
serialID := requestData[2] & SerialMask
if serialID == Zero {
return ""
}
if (requestData[2] & FlagEvent) != Zero {
return "Heartbeat"
}
if (requestData[2] & FlagRequest) == Zero {
// Invalid Data
return ""
}
if (requestData[2] & FlagTwoWay) == Zero {
// Ignore Oneway Data
return "Oneway"
}

serializer := GetSerializer(serialID)
if serializer == serialUnsupport {
// Unsupport Serial. only support hessian and fastjson.
return "UnSupportSerialFormat"
}

var (
service string
method string
)
// version
offset := serializer.eatString(requestData, 16)

// service name
offset, service = serializer.getStringValue(requestData, offset)

// service version
offset = serializer.eatString(requestData, offset)

// method name
_, method = serializer.getStringValue(requestData, offset)

return service + "#" + method
}
45 changes: 45 additions & 0 deletions collector/analyzer/network/protocol/dubbo/dubbo_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package dubbo

import (
"github.com/Kindling-project/kindling/collector/analyzer/network/protocol"
"github.com/Kindling-project/kindling/collector/model/constlabels"
)

func fastfailDubboResponse() protocol.FastFailFn {
return func(message *protocol.PayloadMessage) bool {
return len(message.Data) < 16 || message.Data[0] != MagicHigh || message.Data[1] != MagicLow
}
}

func parseDubboResponse() protocol.ParsePkgFn {
return func(message *protocol.PayloadMessage) (bool, bool) {
errorCode := getErrorCode(message.Data)
if errorCode == -1 {
return false, true
}

message.AddIntAttribute(constlabels.DubboErrorCode, errorCode)
if errorCode > 20 {
message.AddBoolAttribute(constlabels.IsError, true)
message.AddIntAttribute(constlabels.ErrorType, int64(constlabels.ProtocolError))
}
message.AddStringAttribute(constlabels.DubboResponsePayload, getAsciiString(message.GetData(16, protocol.GetDubboPayLoadLength())))
return true, true
}
}

func getErrorCode(responseData []byte) int64 {
SerialID := responseData[2] & SerialMask
if SerialID == Zero {
return -1
}
if (responseData[2] & FlagEvent) != Zero {
return 20
}
if (responseData[2] & FlagRequest) != Zero {
// Invalid Data
return -1
}

return int64(responseData[3])
}
Loading

0 comments on commit 1a2ead6

Please sign in to comment.