diff --git a/CHANGELOG.md b/CHANGELOG.md index 2952f516a..707d50c8e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,11 +9,13 @@ - ### Enhancements -- Improve Kindling Event log format.([#455](https://github.com/KindlingProject/kindling/pull/455)) +- Improve Kindling Event log format. ([#455](https://github.com/KindlingProject/kindling/pull/455)) - ### Bug fixes -- Fix retransmission count is not consistent with the real value on Linux 4.7 or higher([#450](https://github.com/KindlingProject/kindling/pull/450)) +- Fix the potential endless loop in the rocketmq parser. ([#465](https://github.com/KindlingProject/kindling/pull/465)) +- Fix retransmission count is not consistent with the real value on Linux 4.7 or higher. ([#450](https://github.com/KindlingProject/kindling/pull/450)) +- Reduce the cases pods are not found when they are daemonset. ([#439](https://github.com/KindlingProject/kindling/pull/439) @llhhbc) - Collector subscribes `sendmmsg` events to fix the bug that some DNS requests are missed. ([#430](https://github.com/KindlingProject/kindling/pull/430)) - Fix the bug that the agent panics when it receives DeletedFinalStateUnknown by watching K8s metadata. ([#456](https://github.com/KindlingProject/kindling/pull/456)) - diff --git a/collector/pkg/component/analyzer/network/protocol/rocketmq/rocketmq_request.go b/collector/pkg/component/analyzer/network/protocol/rocketmq/rocketmq_request.go index 655674356..870b680de 100644 --- a/collector/pkg/component/analyzer/network/protocol/rocketmq/rocketmq_request.go +++ b/collector/pkg/component/analyzer/network/protocol/rocketmq/rocketmq_request.go @@ -3,6 +3,7 @@ package rocketmq import ( "encoding/json" "fmt" + "github.com/Kindling-project/kindling/collector/pkg/component/analyzer/network/protocol" "github.com/Kindling-project/kindling/collector/pkg/model/constlabels" ) @@ -63,6 +64,7 @@ func parseHeader(message *protocol.PayloadMessage, header *rocketmqHeader) { remarkLen int32 extFieldLen int32 offset int + err error ) message.ReadInt16(8, &header.Code) header.LanguageCode = message.Data[10] @@ -83,10 +85,22 @@ func parseHeader(message *protocol.PayloadMessage, header *rocketmqHeader) { //offset starts from 29 var extFieldBytesLen = 0 for extFieldBytesLen < int(extFieldLen) && extFieldBytesLen+29 < len(message.Data) { - offset, _ = message.ReadInt16(offset, &keyLen) - offset, key, _ = message.ReadBytes(offset, int(keyLen)) - offset, _ = message.ReadInt32(offset, &valueLen) - offset, value, _ = message.ReadBytes(offset, int(valueLen)) + offset, err = message.ReadInt16(offset, &keyLen) + if err != nil { + break + } + offset, key, err = message.ReadBytes(offset, int(keyLen)) + if err != nil { + break + } + offset, err = message.ReadInt32(offset, &valueLen) + if err != nil { + break + } + offset, value, err = message.ReadBytes(offset, int(valueLen)) + if err != nil { + break + } extFieldMap[string(key)] = string(value) extFieldBytesLen = extFieldBytesLen + 2 + int(keyLen) + 4 + int(valueLen) if string(key) == "topic" || string(key) == "b" { diff --git a/collector/pkg/component/analyzer/network/protocol/rocketmq/rocketmq_request_test.go b/collector/pkg/component/analyzer/network/protocol/rocketmq/rocketmq_request_test.go new file mode 100644 index 000000000..4f1426520 --- /dev/null +++ b/collector/pkg/component/analyzer/network/protocol/rocketmq/rocketmq_request_test.go @@ -0,0 +1,47 @@ +package rocketmq + +import ( + "testing" + "time" + + "github.com/Kindling-project/kindling/collector/pkg/component/analyzer/network/protocol" +) + +func Test_parseHeader(t *testing.T) { + type args struct { + message *protocol.PayloadMessage + header *rocketmqHeader + } + tests := []struct { + name string + args args + }{ + { + name: "busy-loop", + args: args{ + message: protocol.NewRequestMessage([]byte{ + 1, 23, 4, 20, 123, 213, 4, 2, 34, 12, 23, 1, 23, 4, 20, 123, + 213, 4, 2, 34, 12, 0, 0, 0, 0, 20, 123, 213, 4, 254, 34, 12, + 23, 1, 23, 4, 20, 123, 213, 4, 2, 34, 12, 23, 1, 23, 4, 20, + 123, 213, 4, 2, 34, 12, 23}), + header: &rocketmqHeader{ExtFields: map[string]string{}}, + }, + }, + } + finishCh := make(chan bool) + go func() { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + parseHeader(tt.args.message, tt.args.header) + }) + } + finishCh <- true + }() + + select { + case <-time.After(5 * time.Second): + t.Fatal("The test case didn't finish in 5 seconds.") + case <-finishCh: + return + } +}