使用方式QMQ- https://github.com/qunarcorp/qmqSpringBootStarterforQMQ- https://gitee.com/wjtree/qmq-spring-boot-starter引入Maven依赖(已上传到中央仓库)<dependency> <groupId>xin.wjtree.qmq</groupId> <artifactId>qmq-spring-boot-starter</artifactId> <version>1.0.0</version></dependency>添加SpringBoot配置(YML)spring: application: name:qmq-demo qmq: #应用标识appcode,必填 app-code:qmq-demo #服务器地址metaserver,必填 meta-server:https://127.0.0.1:8080/meta/address #生产者配置,发送消息的线程池的设置,选填 producer: #发送线程数,默认3 send-threads:3 #默认每次发送时最大批量大小,默认30 send-batch:30 #如果消息发送失败,重试次数,默认10 send-try-count:10 #异步发送队列大小,默认10000 max-queue-size:10000 #使用QmqTemplate发送消息的默认主题,默认值default_subject template: default-subject:default_subject #消费者配置,消费消息的线程池的设置,选填 consumer: #线程名称前缀,默认qmq-process thread-name-prefix:qmq-process #线程池大小,默认2 core-pool-size:2 #最大线程池大小,默认2 max-pool-size:2 #线程池队列大小,默认1000 queue-capacity:1000 #消息主题和分组配置,选填 #使用QmqConsumer注解时,可使用SpEL表达式引入以下主题和分组 subject: sub1:sub1 sub2:sub2 sub3:sub3 #moresubject... group: group1:group1 group2:group2 group3:group3 #moregroup...logging: level: #设置qmq-spring-boot-starter的日志级别 xin.wjtree.qmq:traceserver: port:8989发送消息importorg.junit.Test;importorg.junit.runner.RunWith;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.junit4.SpringRunner;importqunar.tc.qmq.Message;importqunar.tc.qmq.MessageSendStateListener;importxin.wjtree.qmq.QmqTemplate;importxin.wjtree.qmq.autoconfigure.QmqProperties;importxin.wjtree.qmq.constant.QmqTimeUnit;importxin.wjtree.qmq.internal.QmqAlias;importxin.wjtree.qmq.internal.QmqIgnore;importjavax.annotation.Resource;importjava.math.BigDecimal;importjava.text.ParseException;importjava.text.SimpleDateFormat;importjava.util.Date;importjava.util.concurrent.CountDownLatch;@RunWith(SpringRunner.class)@SpringBootTestpublicclassQmqTest{ @Resource privateQmqTemplatetemplate; @Resource privateQmqPropertiesproperties; /** *发送即时消息 *@throwsInterruptedException */ @Test publicvoidsendImmediate()throwsInterruptedException{ //计数器,执行1次结束 CountDownLatchlatch=newCountDownLatch(1); //一般使用template.send(properties.getSubject().get("sub1"),getUser())即可 template.withSendStateListener(newMessageSendStateListener(){ @Override publicvoidonSuccess(Messagem){ latch.countDown(); } @Override publicvoidonFailed(Messagem){ latch.countDown(); } }).send(properties.getSubject().get("sub1"),getUser()); //计数器减1 latch.await(); } /** *发送延时消息 *@throwsInterruptedException */ @Test publicvoidsendDelay()throwsInterruptedException{ //计数器,执行1次结束 CountDownLatchlatch=newCountDownLatch(1); //延时10秒发送消息 //一般使用template.sendDelay(properties.getSubject().get("sub1"),getUser(),QmqTimeUnit.TEN_SECONDS)即可 template.withSendStateListener(newMessageSendStateListener(){ @Override publicvoidonSuccess(Messagem){ latch.countDown(); } @Override publicvoidonFailed(Messagem){ latch.countDown(); } }).sendDelay(properties.getSubject().get("sub1"),getUser(),QmqTimeUnit.TEN_SECONDS); //计数器减1 latch.await(); } /** *发送定时消息 *@throwsInterruptedException */ @Test publicvoidsendSchedule()throwsInterruptedException,ParseException{ //计数器,执行1次结束 CountDownLatchlatch=newCountDownLatch(1); //定时发送的日期时间 Datedate=newSimpleDateFormat("yyyy-MM-ddHH声明:本文仅代表作者观点,不代表本站立场。如果侵犯到您的合法权益,请联系我们删除侵权资源!如果遇到资源链接失效,请您通过评论或工单的方式通知管理员。未经允许,不得转载,本站所有资源文章禁止商业使用运营!
下载安装【程序员客栈】APP
实时对接需求、及时收发消息、丰富的开放项目需求、随时随地查看项目状态
评论