diff --git a/spring-cloud-alibaba-dependencies/pom.xml b/spring-cloud-alibaba-dependencies/pom.xml index 5bf00ea3dc..3cbfa29ecc 100644 --- a/spring-cloud-alibaba-dependencies/pom.xml +++ b/spring-cloud-alibaba-dependencies/pom.xml @@ -23,7 +23,7 @@ 2.4.2 2.1.0 - 5.1.4 + 5.3.1 1.0.0-M1 diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java index 02cae48b24..b6c0635ab3 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQConsumerFactory.java @@ -21,6 +21,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; @@ -76,6 +77,7 @@ public static DefaultMQPushConsumer initPushConsumer( consumer.setInstanceName( RocketMQUtils.getInstanceName(rpcHook, consumerProperties.getGroup())); consumer.setNamespace(consumerProperties.getNamespace()); + consumer.setNamespaceV2(consumerProperties.getNamespaceV2()); consumer.setNamesrvAddr(consumerProperties.getNameServer()); consumer.setMessageModel(getMessageModel(consumerProperties.getMessageModel())); consumer.setUseTLS(consumerProperties.getUseTLS()); @@ -91,6 +93,10 @@ public static DefaultMQPushConsumer initPushConsumer( consumer.setConsumeThreadMin(extendedConsumerProperties.getConcurrency()); consumer.setConsumeThreadMax(extendedConsumerProperties.getConcurrency()); consumer.setUnitName(consumerProperties.getUnitName()); + consumer.setMaxReconsumeTimes( + consumerProperties.getPush().getMaxReconsumeTimes()); + consumer.setConsumeTimeout(consumerProperties.getPush().getConsumeTimeout()); + consumer.setAccessChannel(AccessChannel.valueOf(consumerProperties.getAccessChannel())); return consumer; } @@ -144,6 +150,7 @@ public static DefaultLitePullConsumer initPullConsumer( } consumer.setNamesrvAddr(consumerProperties.getNameServer()); consumer.setMessageModel(getMessageModel(consumerProperties.getMessageModel())); + consumer.setNamespaceV2(consumerProperties.getNamespaceV2()); consumer.setUseTLS(consumerProperties.getUseTLS()); consumer.setPullTimeDelayMillsWhenException( consumerProperties.getPullTimeDelayMillsWhenException()); @@ -162,6 +169,7 @@ public static DefaultLitePullConsumer initPullConsumer( consumer.setPullThresholdForAll(extendedConsumerProperties.getExtension() .getPull().getPullThresholdForAll()); consumer.setUnitName(consumerProperties.getUnitName()); + consumer.setAccessChannel(AccessChannel.valueOf(consumerProperties.getAccessChannel())); return consumer; } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java index 19aa729f1f..cb14cbc640 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/integration/outbound/RocketMQProduceFactory.java @@ -24,6 +24,7 @@ import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.hook.CheckForbiddenHook; import org.apache.rocketmq.client.hook.SendMessageHook; import org.apache.rocketmq.client.producer.DefaultMQProducer; @@ -78,11 +79,12 @@ public static DefaultMQProducer initRocketMQProducer(String topic, if (RocketMQProducerProperties.ProducerType.Trans .equalsName(producerProperties.getProducerType())) { producer = new TransactionMQProducer(producerProperties.getNamespace(), - producerProperties.getGroup(), rpcHook); + producerProperties.getGroup(), rpcHook, producerProperties.getEnableMsgTrace(), + producerProperties.getCustomizedTraceTopic()); if (producerProperties.getEnableMsgTrace()) { try { AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher( - producerProperties.getGroup(), TraceDispatcher.Type.PRODUCE, + producerProperties.getGroup(), TraceDispatcher.Type.PRODUCE, 10, producerProperties.getCustomizedTraceTopic(), rpcHook); dispatcher.setHostProducer(producer.getDefaultMQProducerImpl()); Field field = DefaultMQProducer.class @@ -110,6 +112,7 @@ public static DefaultMQProducer initRocketMQProducer(String topic, producer.setInstanceName( RocketMQUtils.getInstanceName(rpcHook, topic + "|" + UtilAll.getPid())); producer.setNamesrvAddr(producerProperties.getNameServer()); + producer.setNamespaceV2(producerProperties.getNamespaceV2()); producer.setSendMsgTimeout(producerProperties.getSendMsgTimeout()); producer.setRetryTimesWhenSendFailed( producerProperties.getRetryTimesWhenSendFailed()); @@ -122,6 +125,7 @@ public static DefaultMQProducer initRocketMQProducer(String topic, producer.setMaxMessageSize(producerProperties.getMaxMessageSize()); producer.setUseTLS(producerProperties.getUseTLS()); producer.setUnitName(producerProperties.getUnitName()); + producer.setAccessChannel(AccessChannel.valueOf(producerProperties.getAccessChannel())); CheckForbiddenHook checkForbiddenHook = RocketMQBeanContainerCache.getBean( producerProperties.getCheckForbiddenHook(), CheckForbiddenHook.class); if (null != checkForbiddenHook) { diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java index abe7a57d83..84b69cbde7 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQCommonProperties.java @@ -56,6 +56,8 @@ public class RocketMQCommonProperties implements Serializable { private String namespace; + private String namespaceV2; + /** * The property of "unitName". */ @@ -140,6 +142,14 @@ public void setNamespace(String namespace) { this.namespace = namespace; } + public String getNamespaceV2() { + return namespaceV2; + } + + public void setNamespaceV2(String namespaceV2) { + this.namespaceV2 = namespaceV2; + } + public String getAccessChannel() { return accessChannel; } diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java index 5bd6b22c96..7a74a4499c 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/properties/RocketMQConsumerProperties.java @@ -304,6 +304,12 @@ public static class Push implements Serializable { */ private int consumeMessageBatchMaxSize = 1; + /** + * Maximum amount of time in minutes a message may block the consuming thread. + * Unit: Minutes + */ + private long consumeTimeout = 15; + public boolean getOrderly() { return orderly; } @@ -368,6 +374,13 @@ public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) { this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize; } + public long getConsumeTimeout() { + return consumeTimeout; + } + + public void setConsumeTimeout(long consumeTimeout) { + this.consumeTimeout = consumeTimeout; + } } public static class Pull implements Serializable { diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java index c3513b5da3..c977b8b17f 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/utils/RocketMQUtils.java @@ -57,6 +57,9 @@ public static T mergeRocketMQProperties( if (StringUtils.isEmpty(mqProperties.getNamespace())) { mqProperties.setNamespace(binderConfigurationProperties.getNamespace()); } + if (StringUtils.isEmpty(mqProperties.getNamespaceV2())) { + mqProperties.setNamespaceV2(binderConfigurationProperties.getNamespaceV2()); + } if (StringUtils.isEmpty(mqProperties.getGroup())) { mqProperties.setGroup(binderConfigurationProperties.getGroup()); }