Skip to content

Commit

Permalink
feat: add message log
Browse files Browse the repository at this point in the history
  • Loading branch information
tuya-qiufeng committed Jul 7, 2023
1 parent 132c5a4 commit fc9f240
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

import com.alibaba.fastjson.JSON;
import com.tuya.open.sdk.mq.AESBase64Utils;

import com.tuya.open.sdk.mq.MqConfigs;
import com.tuya.open.sdk.mq.MqConsumer;
import org.apache.pulsar.client.api.MessageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -19,11 +19,15 @@ public class ConsumerExample {
public static void main(String[] args) throws Exception {
MqConsumer mqConsumer = MqConsumer.build().serviceUrl(URL).accessId(ACCESS_ID).accessKey(ACCESS_KEY)
.messageListener(message -> {
System.out.println("---------------------------------------------------");
System.out.println("Message received:" + new String(message.getData()) + ",time="
+ message.getPublishTime() + ",consumed time=" + System.currentTimeMillis());
MessageId msgId = message.getMessageId();
String tid = message.getProperty("tid");
long publishTime = message.getPublishTime();
String payload = new String(message.getData());
logger.debug("###TUYA_PULSAR_MSG => start process message, messageId={}, publishTime={}, tid={}, payload={}",
msgId, publishTime, tid, payload);
payloadHandler(payload);
logger.debug("###TUYA_PULSAR_MSG => finish process message, messageId={}, publishTime={}, tid={}",
msgId, publishTime, tid);
});
mqConsumer.start();
}
Expand Down
18 changes: 14 additions & 4 deletions open-mq-sdk/src/main/java/com/tuya/open/sdk/mq/MqConsumer.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@

package com.tuya.open.sdk.mq;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -58,6 +55,7 @@ public MqConsumer messageListener(IMessageListener messageListener) {
* @throws Exception
*/
public void start() throws Exception {
logger.debug("###TUYA_PULSAR_MSG => start initial pulsar consumer");
if (serviceUrl == null || serviceUrl.trim().length() == 0) {
throw new IllegalStateException("serviceUrl must be initialized");
}
Expand All @@ -75,15 +73,27 @@ public void start() throws Exception {
Consumer consumer = client.newConsumer().topic(String.format("%s/out/%s", accessId, env.getValue()))
.subscriptionName(String.format("%s-sub", accessId)).subscriptionType(SubscriptionType.Failover)
.autoUpdatePartitions(Boolean.FALSE).subscribe();
logger.debug("###TUYA_PULSAR_MSG => pulsar consumer initial success");
do {
MessageId msgId = null;
String tid = null;
try {
logger.debug("###TUYA_PULSAR_MSG => start receive next message");
Message message = consumer.receive();
msgId = message.getMessageId();
tid = message.getProperty("tid");
long publishTime = message.getPublishTime();
logger.debug("###TUYA_PULSAR_MSG => message received, messageId={}, publishTime={}, tid={}", msgId, publishTime, tid);

Long s = System.currentTimeMillis();
messageListener.onMessageArrived(message);
if (MqConfigs.DEBUG) {
logger.info("business processing cost={}", System.currentTimeMillis() - s);
}

logger.debug("###TUYA_PULSAR_MSG => start message ack, messageId={}, publishTime={}, tid={}", msgId, publishTime, tid);
consumer.acknowledge(message);
logger.debug("###TUYA_PULSAR_MSG => message ack success, messageId={}, publishTime={}, tid={}", msgId, publishTime, tid);
} catch (Throwable t) {
logger.error("error:", t);
}
Expand Down

0 comments on commit fc9f240

Please sign in to comment.