Skip to content

ohun/stomp-client

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

10 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

A stomp client base on netty.

Subscribe:

public class StompSubscribeTest {
    @Test
    public void testSub() throws Exception {
        StompClientManager stompClientManager = new StompClientManager();
        stompClientManager.connect("stomp://username:password@host:port");
        stompClientManager.createConsumer("/topic/logon")
                .id("wqteam_test")
                .ackMode(AckMode.AUTO)
                .handler(new MessageHandler() {
                    @Override
                    public void onMessage(Message message) {
                        System.out.println(message);
                    }
                })
               .subscribe();
        LockSupport.park();
    }
}

Sender:

public class StompSenderTest {
    @Test
    public void testSend() throws Exception {
        StompClientManager stompClientManager = new StompClientManager();
        stompClientManager.connect("stomp://username:password@10.232.136.85:61613");
        MessageProducer producer = stompClientManager.createProducer("/topic/logon3");

        for (int i = 0; i < 100; i++) {
            if (i > 5) {
                StompTransaction tx = producer.begin();
                Thread.sleep(1000);
                tx.send(i + "风格的歌");
                tx.abort();
            } else {
                producer.send(i + "大苏打");
            }

            try {
              ReceiptFuture future = producer.sendW((i + "大苏打").getBytes("UTF-8"));
              future.await();
            } catch (Exception e) {
              e.printStackTrace();
            }
        }
        LockSupport.park();
    }
}

For Spring:

<bean id="stompClientManager" class="com.ohun.stomp.StompClientManager"
            init-method="start" destroy-method="stop">
    <!--<property name="uri" 
    value="stomp://username:password@10.232.136.85:61613?connectTimeout=3000"/>
           两种配置方式是一样的,看个人喜好-->
    <property name="config">
        <bean class="com.ohun.stomp.common.StompConfig">
            <property name="host" value="${wangxin.mq.stomp.host}"/>
            <property name="port" value="${wangxin.mq.stomp.port}"/>
            <property name="login" value="${wangxin.stomp.username}"/>
            <property name="pass" value="${wangxin.stomp.password}"/>
            <property name="connectTimeout" value="3000"/>
            <property name="heartbeatX" value="600000"/>
            <property name="heartbeatY" value="100000"/>
            <property name="monitorPeriod" value="60"/>
            <property name="connectCountPreHost" value="1"/>
        </bean>
    </property>
</bean>

<bean id="wx2PubMsgListener" class="com.ohun.test.WX2PublicMsgListener"
                  init-method="init" destroy-method="destroy">
    <property name="topic" value="/topic/pamsgfromwx"/>
    <property name="clientId" value="wxadmin"/>
</bean>
public class WX2PublicMsgListener implements MessageHandler {

    @Resource
    private StompClientManager stompClientManager;

    public void init() {
        this.executor = newExecutor();
        MessageConsumer consumer = stompClientManager.createConsumer(topic);
        consumer.id(clientId).executor(executor).handler(this).subscribe();
    }

    @Override
    public void onMessage(final Message message) {
          logger.error(message.getTextBody())
    }

    public void destroy() throws Exception {
        executor.shutdown();
    }

    private ThreadPoolExecutor newExecutor() {
        final ThreadFactory threadFactory = new BasicThreadFactory.Builder()
                    .daemon(true).namingPattern("wx-2-pub-%d").build();
        return new ThreadPoolExecutor(2, poolSize, 5L, TimeUnit.MINUTES,
                    new LinkedBlockingQueue<Runnable>(queueSize),
                    threadFactory,
                    new RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                            logger.warn("one message task was rejected total="
                                    + rejectedCount.incrementAndGet()
                                    + ",poolStatus=" + poolStatus());
                        }
                });
    }

    private final Logger logger = LoggerFactory.getLogger(WX2PublicMsgListener.class);
    private AtomicInteger rejectedCount = new AtomicInteger(0);
    private int poolSize = 10, queueSize = 100;
    private ThreadPoolExecutor executor;

    private String topic;

    private String clientId;
}

About

stomp client for java base netty

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages