public class StompSubscribeTest {
@Test
public void testSub() throws Exception {
StompClientManager stompClientManager = new StompClientManager();
stompClientManager.connect("stomp://username:password@host:port");
stompClientManager.createConsumer("/topic/logon")
.id("wqteam_test")
.ackMode(AckMode.AUTO)
.handler(new MessageHandler() {
@Override
public void onMessage(Message message) {
System.out.println(message);
}
})
.subscribe();
LockSupport.park();
}
}
public class StompSenderTest {
@Test
public void testSend() throws Exception {
StompClientManager stompClientManager = new StompClientManager();
stompClientManager.connect("stomp://username:password@10.232.136.85:61613");
MessageProducer producer = stompClientManager.createProducer("/topic/logon3");
for (int i = 0; i < 100; i++) {
if (i > 5) {
StompTransaction tx = producer.begin();
Thread.sleep(1000);
tx.send(i + "风格的歌");
tx.abort();
} else {
producer.send(i + "大苏打");
}
try {
ReceiptFuture future = producer.sendW((i + "大苏打").getBytes("UTF-8"));
future.await();
} catch (Exception e) {
e.printStackTrace();
}
}
LockSupport.park();
}
}
<bean id="stompClientManager" class="com.ohun.stomp.StompClientManager"
init-method="start" destroy-method="stop">
<!--<property name="uri"
value="stomp://username:password@10.232.136.85:61613?connectTimeout=3000"/>
两种配置方式是一样的,看个人喜好-->
<property name="config">
<bean class="com.ohun.stomp.common.StompConfig">
<property name="host" value="${wangxin.mq.stomp.host}"/>
<property name="port" value="${wangxin.mq.stomp.port}"/>
<property name="login" value="${wangxin.stomp.username}"/>
<property name="pass" value="${wangxin.stomp.password}"/>
<property name="connectTimeout" value="3000"/>
<property name="heartbeatX" value="600000"/>
<property name="heartbeatY" value="100000"/>
<property name="monitorPeriod" value="60"/>
<property name="connectCountPreHost" value="1"/>
</bean>
</property>
</bean>
<bean id="wx2PubMsgListener" class="com.ohun.test.WX2PublicMsgListener"
init-method="init" destroy-method="destroy">
<property name="topic" value="/topic/pamsgfromwx"/>
<property name="clientId" value="wxadmin"/>
</bean>
public class WX2PublicMsgListener implements MessageHandler {
@Resource
private StompClientManager stompClientManager;
public void init() {
this.executor = newExecutor();
MessageConsumer consumer = stompClientManager.createConsumer(topic);
consumer.id(clientId).executor(executor).handler(this).subscribe();
}
@Override
public void onMessage(final Message message) {
logger.error(message.getTextBody())
}
public void destroy() throws Exception {
executor.shutdown();
}
private ThreadPoolExecutor newExecutor() {
final ThreadFactory threadFactory = new BasicThreadFactory.Builder()
.daemon(true).namingPattern("wx-2-pub-%d").build();
return new ThreadPoolExecutor(2, poolSize, 5L, TimeUnit.MINUTES,
new LinkedBlockingQueue<Runnable>(queueSize),
threadFactory,
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
logger.warn("one message task was rejected total="
+ rejectedCount.incrementAndGet()
+ ",poolStatus=" + poolStatus());
}
});
}
private final Logger logger = LoggerFactory.getLogger(WX2PublicMsgListener.class);
private AtomicInteger rejectedCount = new AtomicInteger(0);
private int poolSize = 10, queueSize = 100;
private ThreadPoolExecutor executor;
private String topic;
private String clientId;
}