Skip to content

Commit

Permalink
rocketmq add some param (#3879)
Browse files Browse the repository at this point in the history
  • Loading branch information
panzhi33 authored Oct 31, 2024
1 parent 7630bb4 commit bd6475f
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 3 deletions.
2 changes: 1 addition & 1 deletion spring-cloud-alibaba-dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<nacos.client.version>2.4.2</nacos.client.version>
<seata.version>2.1.0</seata.version>
<!-- Apache RocketMQ -->
<rocketmq.version>5.1.4</rocketmq.version>
<rocketmq.version>5.3.1</rocketmq.version>

<!-- Spring AI -->
<spring.ai.version>1.0.0-M1</spring.ai.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
Expand Down Expand Up @@ -76,6 +77,7 @@ public static DefaultMQPushConsumer initPushConsumer(
consumer.setInstanceName(
RocketMQUtils.getInstanceName(rpcHook, consumerProperties.getGroup()));
consumer.setNamespace(consumerProperties.getNamespace());
consumer.setNamespaceV2(consumerProperties.getNamespaceV2());
consumer.setNamesrvAddr(consumerProperties.getNameServer());
consumer.setMessageModel(getMessageModel(consumerProperties.getMessageModel()));
consumer.setUseTLS(consumerProperties.getUseTLS());
Expand All @@ -91,6 +93,10 @@ public static DefaultMQPushConsumer initPushConsumer(
consumer.setConsumeThreadMin(extendedConsumerProperties.getConcurrency());
consumer.setConsumeThreadMax(extendedConsumerProperties.getConcurrency());
consumer.setUnitName(consumerProperties.getUnitName());
consumer.setMaxReconsumeTimes(
consumerProperties.getPush().getMaxReconsumeTimes());
consumer.setConsumeTimeout(consumerProperties.getPush().getConsumeTimeout());
consumer.setAccessChannel(AccessChannel.valueOf(consumerProperties.getAccessChannel()));
return consumer;
}

Expand Down Expand Up @@ -144,6 +150,7 @@ public static DefaultLitePullConsumer initPullConsumer(
}
consumer.setNamesrvAddr(consumerProperties.getNameServer());
consumer.setMessageModel(getMessageModel(consumerProperties.getMessageModel()));
consumer.setNamespaceV2(consumerProperties.getNamespaceV2());
consumer.setUseTLS(consumerProperties.getUseTLS());
consumer.setPullTimeDelayMillsWhenException(
consumerProperties.getPullTimeDelayMillsWhenException());
Expand All @@ -162,6 +169,7 @@ public static DefaultLitePullConsumer initPullConsumer(
consumer.setPullThresholdForAll(extendedConsumerProperties.getExtension()
.getPull().getPullThresholdForAll());
consumer.setUnitName(consumerProperties.getUnitName());
consumer.setAccessChannel(AccessChannel.valueOf(consumerProperties.getAccessChannel()));
return consumer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.hook.CheckForbiddenHook;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
Expand Down Expand Up @@ -78,11 +79,12 @@ public static DefaultMQProducer initRocketMQProducer(String topic,
if (RocketMQProducerProperties.ProducerType.Trans
.equalsName(producerProperties.getProducerType())) {
producer = new TransactionMQProducer(producerProperties.getNamespace(),
producerProperties.getGroup(), rpcHook);
producerProperties.getGroup(), rpcHook, producerProperties.getEnableMsgTrace(),
producerProperties.getCustomizedTraceTopic());
if (producerProperties.getEnableMsgTrace()) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(
producerProperties.getGroup(), TraceDispatcher.Type.PRODUCE,
producerProperties.getGroup(), TraceDispatcher.Type.PRODUCE, 10,
producerProperties.getCustomizedTraceTopic(), rpcHook);
dispatcher.setHostProducer(producer.getDefaultMQProducerImpl());
Field field = DefaultMQProducer.class
Expand Down Expand Up @@ -110,6 +112,7 @@ public static DefaultMQProducer initRocketMQProducer(String topic,
producer.setInstanceName(
RocketMQUtils.getInstanceName(rpcHook, topic + "|" + UtilAll.getPid()));
producer.setNamesrvAddr(producerProperties.getNameServer());
producer.setNamespaceV2(producerProperties.getNamespaceV2());
producer.setSendMsgTimeout(producerProperties.getSendMsgTimeout());
producer.setRetryTimesWhenSendFailed(
producerProperties.getRetryTimesWhenSendFailed());
Expand All @@ -122,6 +125,7 @@ public static DefaultMQProducer initRocketMQProducer(String topic,
producer.setMaxMessageSize(producerProperties.getMaxMessageSize());
producer.setUseTLS(producerProperties.getUseTLS());
producer.setUnitName(producerProperties.getUnitName());
producer.setAccessChannel(AccessChannel.valueOf(producerProperties.getAccessChannel()));
CheckForbiddenHook checkForbiddenHook = RocketMQBeanContainerCache.getBean(
producerProperties.getCheckForbiddenHook(), CheckForbiddenHook.class);
if (null != checkForbiddenHook) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class RocketMQCommonProperties implements Serializable {

private String namespace;

private String namespaceV2;

/**
* The property of "unitName".
*/
Expand Down Expand Up @@ -140,6 +142,14 @@ public void setNamespace(String namespace) {
this.namespace = namespace;
}

public String getNamespaceV2() {
return namespaceV2;
}

public void setNamespaceV2(String namespaceV2) {
this.namespaceV2 = namespaceV2;
}

public String getAccessChannel() {
return accessChannel;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,12 @@ public static class Push implements Serializable {
*/
private int consumeMessageBatchMaxSize = 1;

/**
* Maximum amount of time in minutes a message may block the consuming thread.
* Unit: Minutes
*/
private long consumeTimeout = 15;

public boolean getOrderly() {
return orderly;
}
Expand Down Expand Up @@ -368,6 +374,13 @@ public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
}

public long getConsumeTimeout() {
return consumeTimeout;
}

public void setConsumeTimeout(long consumeTimeout) {
this.consumeTimeout = consumeTimeout;
}
}

public static class Pull implements Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public static <T extends RocketMQCommonProperties> T mergeRocketMQProperties(
if (StringUtils.isEmpty(mqProperties.getNamespace())) {
mqProperties.setNamespace(binderConfigurationProperties.getNamespace());
}
if (StringUtils.isEmpty(mqProperties.getNamespaceV2())) {
mqProperties.setNamespaceV2(binderConfigurationProperties.getNamespaceV2());
}
if (StringUtils.isEmpty(mqProperties.getGroup())) {
mqProperties.setGroup(binderConfigurationProperties.getGroup());
}
Expand Down

0 comments on commit bd6475f

Please sign in to comment.