Skip to content

Commit

Permalink
rocketmq consumer support some config item (#2811)
Browse files Browse the repository at this point in the history
Signed-off-by: Naah <nayan3480232@163.com>
Signed-off-by: Bernd Verst <github@bernd.dev>
Co-authored-by: Deepanshu Agarwal <deepanshu.agarwal1984@gmail.com>
Co-authored-by: Bernd Verst <github@bernd.dev>
  • Loading branch information
3 people committed May 4, 2023
1 parent 9f4086f commit 615d491
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 13 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/aliyun/aliyun-tablestore-go-sdk v1.7.7
github.com/apache/dubbo-go-hessian2 v1.11.5
github.com/apache/pulsar-client-go v0.9.0
github.com/apache/rocketmq-client-go/v2 v2.1.1-rc2
github.com/apache/rocketmq-client-go/v2 v2.1.2-0.20230412142645-25003f6f083d
github.com/apache/thrift v0.13.0
github.com/aws/aws-sdk-go v1.44.214
github.com/benbjohnson/clock v1.3.0
Expand Down Expand Up @@ -335,7 +335,6 @@ require (
github.com/sony/gobreaker v0.5.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stathat/consistent v1.0.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/tchap/go-patricia/v2 v2.3.1 // indirect
github.com/tidwall/gjson v1.13.0 // indirect
Expand Down Expand Up @@ -398,6 +397,7 @@ require (
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
stathat.com/c/consistent v1.0.0 // indirect
)

replace github.com/gobwas/pool => github.com/gobwas/pool v0.2.1
Expand Down
15 changes: 7 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,8 @@ github.com/AzureAD/microsoft-authentication-library-for-go v0.8.1/go.mod h1:4qFo
github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 h1:OBhqkivkhkMqLPymWEppkm7vgPQY2XsHoEkaMQ0AdZY=
github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0/go.mod h1:kgDmCTgBzIEPFElEF+FK0SdjAor06dRq2Go927dnQ6o=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw=
github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I=
github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
Expand Down Expand Up @@ -569,8 +570,8 @@ github.com/apache/pulsar-client-go v0.9.0/go.mod h1:fSAcBipgz4KQ/VgwZEJtQ71cCXMK
github.com/apache/rocketmq-client-go v1.2.5 h1:2hPoLHpMJy1a57HDNmx7PZKgvlgVYO1Alz925oeqphQ=
github.com/apache/rocketmq-client-go v1.2.5/go.mod h1:Kap8oXIVLlHF50BGUbN9z97QUp1GaK1nOoCfsZnR2bw=
github.com/apache/rocketmq-client-go/v2 v2.1.0/go.mod h1:oEZKFDvS7sz/RWU0839+dQBupazyBV7WX5cP6nrio0Q=
github.com/apache/rocketmq-client-go/v2 v2.1.1-rc2 h1:UQHWhwyw3tSLRhp0lVn/r/uNUzDnBZcDekGSzaXfz0M=
github.com/apache/rocketmq-client-go/v2 v2.1.1-rc2/go.mod h1:DDYjQ9wxYmJLjgNK4+RqyFE8/13gLK/Bugz4U6zD5MI=
github.com/apache/rocketmq-client-go/v2 v2.1.2-0.20230412142645-25003f6f083d h1:bMkOgl4AMaJR0z6Bcxlyliwou/wZifxhRe/rK9TF5+A=
github.com/apache/rocketmq-client-go/v2 v2.1.2-0.20230412142645-25003f6f083d/go.mod h1:6I6vgxHR3hzrvn+6n/4mrhS+UTulzK/X9LB2Vk1U5gE=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI=
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
Expand Down Expand Up @@ -1614,13 +1615,15 @@ github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/ginkgo/v2 v2.0.0/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
github.com/onsi/ginkgo/v2 v2.1.3/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
github.com/onsi/ginkgo/v2 v2.4.0 h1:+Ig9nvqgS5OBSACXNk15PLdp0U9XPYROt9CFzVdFGIs=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs=
github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro=
github.com/onsi/gomega v1.23.0 h1:/oxKu9c2HVap+F3PfKort2Hw5DEU+HGlW8n+tguWsys=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
Expand Down Expand Up @@ -1794,6 +1797,7 @@ github.com/sijms/go-ora/v2 v2.6.11 h1:inBa/Tp0/kEl2prd3p5VabDXvmgVEelg328RYwsOCi
github.com/sijms/go-ora/v2 v2.6.11/go.mod h1:EHxlY6x7y9HAsdfumurRfTd+v8NrEOTR3Xl4FWlH6xk=
github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
Expand Down Expand Up @@ -1841,8 +1845,6 @@ github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5q
github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns=
github.com/spf13/viper v1.14.0 h1:Rg7d3Lo706X9tHsJMUjdiwMpHB7W8WnSVOssIY+JElU=
github.com/stathat/consistent v1.0.0 h1:ZFJ1QTRn8npNBKW065raSZ8xfOqhpb8vLOkfp4CcL/U=
github.com/stathat/consistent v1.0.0/go.mod h1:uajTPbgSygZBJ+V+0mY7meZ8i0XAcZs7AQ6V121XSxw=
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
Expand Down Expand Up @@ -1882,17 +1884,14 @@ github.com/tetratelabs/wazero v1.1.0 h1:EByoAhC+QcYpwSZJSs/aV0uokxPwBgKxfiokSUwA
github.com/tetratelabs/wazero v1.1.0/go.mod h1:wYx2gNRg8/WihJfSDxA1TIL8H+GkfLYm+bIfbblu9VQ=
github.com/tevid/gohamcrest v1.1.1/go.mod h1:3UvtWlqm8j5JbwYZh80D/PVBt0mJ1eJiYgZMibh0H/k=
github.com/tidwall/gjson v1.2.1/go.mod h1:c/nTNbUr0E0OrXEhq1pwa8iEgc2DOt4ZZqAt1HtCkPA=
github.com/tidwall/gjson v1.8.1/go.mod h1:5/xDoumyyDNerp2U36lyolv46b3uF/9Bu6OfyQ9GImk=
github.com/tidwall/gjson v1.9.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.13.0 h1:3TFY9yxOQShrvmjdM76K+jc66zJeT6D3/VFFYCGQf7M=
github.com/tidwall/gjson v1.13.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
github.com/tidwall/match v1.0.3/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.1.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
Expand Down
2 changes: 1 addition & 1 deletion pubsub/rocketmq/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ type rocketMQMetaData struct {
// then pullThresholdForQueue will be set to 100
//
// RocketMQ Go Client does not support configuration in github.com/apache/rocketmq-client-go/v2 v2.1.1-rc2
PullThresholdForTopic int64 `mapstructure:"pullThresholdForTopic"`
PullThresholdForTopic int `mapstructure:"pullThresholdForTopic"`

// RocketMQ Go Client does not support configuration in github.com/apache/rocketmq-client-go/v2 v2.1.1-rc2
PullThresholdSizeForQueue int `mapstructure:"pullThresholdSizeForQueue"`
Expand Down
2 changes: 1 addition & 1 deletion pubsub/rocketmq/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestMetaDataDecode(t *testing.T) {
assert.Equal(t, int32(10), metaData.PullBatchSize)
assert.Equal(t, int(10), metaData.ConsumerBatchSize)
assert.Equal(t, int64(100), metaData.PullThresholdForQueue)
assert.Equal(t, int64(100), metaData.PullThresholdForTopic)
assert.Equal(t, int(100), metaData.PullThresholdForTopic)
assert.Equal(t, 10, metaData.PullThresholdSizeForQueue)
assert.Equal(t, 10, metaData.PullThresholdSizeForTopic)
assert.Equal(t, "json", metaData.ContentType)
Expand Down
24 changes: 24 additions & 0 deletions pubsub/rocketmq/rocketmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ func (r *rocketMQ) setUpConsumer() (mq.PushConsumer, error) {
"we will use default value [ConsumeFromLastOffset]", r.name, r.metadata.FromWhere)
}
}
if r.metadata.ConsumeTimestamp != "" {
opts = append(opts, mqc.WithConsumeTimestamp(r.metadata.ConsumeTimestamp))
}
if r.metadata.ConsumeOrderly != "" {
if utils.IsTruthy(r.metadata.ConsumeOrderly) {
opts = append(opts, mqc.WithConsumerOrder(true))
Expand All @@ -195,12 +198,21 @@ func (r *rocketMQ) setUpConsumer() (mq.PushConsumer, error) {
if r.metadata.ConsumeMessageBatchMaxSize > 0 {
opts = append(opts, mqc.WithConsumeMessageBatchMaxSize(r.metadata.ConsumeMessageBatchMaxSize))
}
if r.metadata.ConsumeConcurrentlyMaxSpan > 0 {
opts = append(opts, mqc.WithConsumeConcurrentlyMaxSpan(r.metadata.ConsumeConcurrentlyMaxSpan))
}
if r.metadata.MaxReconsumeTimes > 0 {
opts = append(opts, mqc.WithMaxReconsumeTimes(r.metadata.MaxReconsumeTimes))
}
if r.metadata.AutoCommit != "" {
opts = append(opts, mqc.WithAutoCommit(utils.IsTruthy(r.metadata.AutoCommit)))
}
if r.metadata.ConsumeTimeout > 0 {
opts = append(opts, mqc.WithConsumeTimeout(time.Duration(r.metadata.ConsumeTimeout)*time.Minute))
}
if r.metadata.ConsumerPullTimeout > 0 {
opts = append(opts, mqc.WithConsumerPullTimeout(time.Duration(r.metadata.ConsumerPullTimeout)*time.Second))
}
if r.metadata.PullInterval > 0 {
opts = append(opts, mqc.WithPullInterval(time.Duration(r.metadata.PullInterval)*time.Millisecond))
}
Expand All @@ -212,6 +224,18 @@ func (r *rocketMQ) setUpConsumer() (mq.PushConsumer, error) {
r.logger.Warn("set the number of msg pulled from the broker at a time, " +
"please use pullBatchSize instead of consumerBatchSize")
}
if r.metadata.PullThresholdForQueue > 0 {
opts = append(opts, mqc.WithPullThresholdForQueue(r.metadata.PullThresholdForQueue))
}
if r.metadata.PullThresholdForTopic > 0 {
opts = append(opts, mqc.WithPullThresholdForTopic(r.metadata.PullThresholdForTopic))
}
if r.metadata.PullThresholdSizeForQueue > 0 {
opts = append(opts, mqc.WithPullThresholdSizeForQueue(r.metadata.PullThresholdSizeForQueue))
}
if r.metadata.PullThresholdSizeForTopic > 0 {
opts = append(opts, mqc.WithPullThresholdSizeForTopic(r.metadata.PullThresholdSizeForTopic))
}
c, e := mqc.NewPushConsumer(opts...)
if e != nil {
return nil, e
Expand Down
3 changes: 2 additions & 1 deletion pubsub/rocketmq/rocketmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ import (
func getTestMetadata() map[string]string {
return map[string]string{
"nameServer": "127.0.0.1:9876",
"consumerGroup": "dapr.rocketmq.producer",
"consumerGroup": "dapr_rocketmq_producer",
"accessKey": "RocketMQ",
"secretKey": "12345",
"consumeTimeout": "20",
"consumerBatchSize": "1",
"consumerThreadNums": "2",
"retries": "2",
Expand Down

0 comments on commit 615d491

Please sign in to comment.