在2018-3-1
日SpringBoot
官方发版了2.0.0.RELEASE
最新版本,新版本完全基于Spring5.0
来构建,JDK
最低支持也从原来的1.6
也改成了1.8
,不再兼容1.8
以下的版本,更多新特性请查看官方文档。
本章目标
基于SpringBoot
整合RabbitMQ
完成消息延迟消费。
构建项目
注意前言
由于SpringBoot
的内置扫描机制,我们如果不自动配置扫描路径,请保持下面rabbitmq-common
模块内的配置可以被SpringBoot
扫描到,否则不会自动创建队列,控制台会输出404的错误信息。
我们本章采用2.0.0.RELEASE
版本的SpringBoot
,添加相关的依赖如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> <relativePath/> </parent> ...... <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.40</version> </dependency> </dependencies> ......
|
我们仍然采用多模块的方式来测试队列的Provider
以及Consumer
。
队列公共模块
我们先来创建一个名为rabbitmq-common
公共依赖模块(Create New Maven Module)
在公共模块内添加一个QueueEnum
队列枚举配置,该枚举内配置队列的Exchange
、QueueName
、RouteKey
等相关内容,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| package com.hengyu.rabbitmq.lazy.enums;
import lombok.Getter;
@Getter public enum QueueEnum {
MESSAGE_QUEUE("message.center.direct", "message.center.create", "message.center.create"),
MESSAGE_TTL_QUEUE("message.center.topic.ttl", "message.center.create.ttl", "message.center.create.ttl");
private String exchange;
private String name;
private String routeKey;
QueueEnum(String exchange, String name, String routeKey) { this.exchange = exchange; this.name = name; this.routeKey = routeKey; } }
|
可以看到MESSAGE_QUEUE
队列配置跟我们之前章节的配置一样,而我们另外新创建了一个后缀为ttl
的消息队列配置。我们采用的这种方式是RabbitMQ
消息队列其中一种的延迟消费模块,通过配置队列消息过期后转发的形式。
这种模式比较简单,我们需要将消息先发送到ttl
延迟队列内,当消息到达过期时间后会自动转发到ttl
队列内配置的转发Exchange
以及RouteKey
绑定的队列内完成消息消费。
下面我们来模拟消息通知
的延迟消费场景,先来创建一个名为MessageRabbitMqConfiguration
的队列配置类,该配置类内添加消息通知队列
配置以及消息通过延迟队列
配置,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
|
@Configuration public class MessageRabbitMqConfiguration {
@Bean DirectExchange messageDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.MESSAGE_QUEUE.getExchange()) .durable(true) .build(); }
@Bean DirectExchange messageTtlDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.MESSAGE_TTL_QUEUE.getExchange()) .durable(true) .build(); }
@Bean public Queue messageQueue() { return new Queue(QueueEnum.MESSAGE_QUEUE.getName()); }
@Bean Queue messageTtlQueue() { return QueueBuilder .durable(QueueEnum.MESSAGE_TTL_QUEUE.getName()) .withArgument("x-dead-letter-exchange", QueueEnum.MESSAGE_QUEUE.getExchange()) .withArgument("x-dead-letter-routing-key", QueueEnum.MESSAGE_QUEUE.getRouteKey()) .build(); }
@Bean Binding messageBinding(DirectExchange messageDirect, Queue messageQueue) { return BindingBuilder .bind(messageQueue) .to(messageDirect) .with(QueueEnum.MESSAGE_QUEUE.getRouteKey()); }
@Bean public Binding messageTtlBinding(Queue messageTtlQueue, DirectExchange messageTtlDirect) { return BindingBuilder .bind(messageTtlQueue) .to(messageTtlDirect) .with(QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey()); } }
|
我们声明了消息通知队列
的相关Exchange
、Queue
、Binding
等配置,将message.center.create
队列通过路由键message.center.create
绑定到了message.center.direct
交换上。
除此之外,我们还添加了消息通知延迟队列
的Exchange
、Queue
、Binding
等配置,将message.center.create.ttl
队列通过message.center.create.ttl
路由键绑定到了message.center.topic.ttl
交换上。
我们仔细来看看messageTtlQueue
延迟队列的配置,跟messageQueue
队列配置不同的地方这里多出了x-dead-letter-exchange
、x-dead-letter-routing-key
两个参数,而这两个参数就是配置延迟队列过期后转发的Exchange
、RouteKey
,只要在创建队列时对应添加了这两个参数,在RabbitMQ
管理平台看到的队列配置就不仅是单纯的Direct
类型的队列类型,如下图所示:
在上图内我们可以看到message.center.create.ttl
队列多出了DLX
、DLK
的配置,这就是RabbitMQ
内死信交换
的标志。
满足死信交换
的条件,在官方文档中表示:
Messages from a queue can be ‘dead-lettered’; that is, republished to another exchange when any of the following events occur:
The message is rejected (basic.reject or basic.nack) with requeue=false,
The TTL for the message expires; or
The queue length limit is exceeded.
- 该消息被拒绝(basic.reject或 basic.nack),requeue = false
- 消息的TTL过期
- 队列长度限制已超出
官方文档地址
我们需要满足上面的其中一种方式就可以了,我们采用满足第二个条件,采用过期的方式。
队列消息提供者
我们再来创建一个名为rabbitmq-lazy-provider
的模块(Create New Maven Module),并且在pom.xml
配置文件内添加rabbitmq-common
模块的依赖,如下所示:
1 2 3 4 5 6
| <dependency> <groupId>com.hengyu</groupId> <artifactId>rabbitmq-common</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
|
配置队列
在resource
下创建一个名为application.yml
的配置文件,在该配置文件内添加如下配置信息:
1 2 3 4 5 6 7 8 9
| spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: /hengboy publisher-confirms: true
|
消息提供者类
接下来我们来创建名为MessageProvider
消息提供者类,用来发送消息内容到消息通知延迟队列,代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
|
@Component public class MessageProvider {
static Logger logger = LoggerFactory.getLogger(MessageProvider.class);
@Autowired private AmqpTemplate rabbitMqTemplate;
public void sendMessage(Object messageContent, String exchange, String routerKey, final long delayTimes) { if (!StringUtils.isEmpty(exchange)) { logger.info("延迟:{}毫秒写入消息队列:{},消息内容:{}", delayTimes, routerKey, JSON.toJSONString(messageContent)); rabbitMqTemplate.convertAndSend(exchange, routerKey, messageContent, message -> { message.getMessageProperties().setExpiration(String.valueOf(delayTimes)); return message; }); } else { logger.error("未找到队列消息:{},所属的交换机", exchange); } } }
|
由于我们在 pom.xml
配置文件内添加了RabbitMQ
相关的依赖并且在上面application.yml
文件内添加了对应的配置,SpringBoot
为我们自动实例化了AmqpTemplate
,该实例可以发送任何类型的消息到指定队列。
我们采用convertAndSend
方法,将消息内容发送到指定Exchange
、RouterKey
队列,并且通过setExpiration
方法设置过期时间,单位:毫秒。
编写发送测试
我们在test
目录下创建一个测试类,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @RunWith(SpringRunner.class) @SpringBootTest(classes = RabbitMqLazyProviderApplication.class) public class RabbitMqLazyProviderApplicationTests {
@Autowired private MessageProvider messageProvider;
@Test public void testLazy() { messageProvider.sendMessage("测试延迟消费,写入时间:" + new Date(), QueueEnum.MESSAGE_TTL_QUEUE.getExchange(), QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey(), 10000); } }
|
注意:@SpringBootTest
注解内添加了classes
入口类的配置,因为我们是模块创建的项目并不是默认创建的SpringBoot
项目,这里需要配置入口程序类才可以运行测试。
在测试类我们注入了MessageProvider
消息提供者,调用sendMessage
方法发送消息到消息通知延迟队列
,并且设置延迟的时间为10秒
,这里衡量发送到指定队列的标准是要看MessageRabbitMqConfiguration
配置类内的相关Binding
配置,通过Exchange
、RouterKey
值进行发送到指定的队列。
到目前为止我们的rabbitmq-lazy-provider
消息提供模块已经编写完成了,下面我们来看看消息消费者模块。
队列消息消费者
我们再来创建一个名为rabbitmq-lazy-consumer
的模块(Create New Maven Module),同样需要在pom.xml
配置文件内添加rabbitmq-common
模块的依赖,如下所示:
1 2 3 4 5 6
| <dependency> <groupId>com.hengyu</groupId> <artifactId>rabbitmq-common</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
|
当然同样需要在resource
下创建application.yml
并添加消息队列的相关配置,代码就不贴出来了,可以直接从rabbitmq-lazy-provider
模块中复制application.yml
文件到当前模块内。
消息消费者类
接下来创建一个名为MessageConsumer
的消费者类,该类需要监听消息通知队列
,代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
@Component @RabbitListener(queues = "message.center.create") public class MessageConsumer {
static Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
@RabbitHandler public void handler(String content) { logger.info("消费内容:{}", content); } }
|
在@RabbitListener
注解内配置了监听的队列,这里配置内容是QueueEnum
枚举内的queueName
属性值,当然如果你采用常量的方式在注解属性上是直接可以使用的,枚举不支持这种配置,这里只能把QueueName
字符串配置到queues
属性上了。
由于我们在消息发送时采用字符串的形式发送消息内容,这里在@RabbitHandler
处理方法的参数内要保持数据类型一致!
消费者入口类
我们为消费者模块添加一个入口程序类,用于启动消费者,代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
@SpringBootApplication public class RabbitMqLazyConsumerApplication {
public static void main(String[] args) { SpringApplication.run(RabbitMqLazyConsumerApplication.class, args); } }
|
测试
我们的代码已经编写完毕,下面来测试下是否完成了我们预想的效果,步骤如下所示:
1 2 3
| 1. 启动消费者模块 2. 执行RabbitMqLazyProviderApplicationTests.testLazy()方法进行发送消息到通知延迟队列 3. 查看消费者模块控制台输出内容
|
我们可以在消费者模块控制台看到输出内容:
1
| 2018-03-04 10:10:34.765 INFO 70486 --- [cTaskExecutor-1] c.h.r.lazy.consumer.MessageConsumer : 消费内容:测试延迟消费,写入时间:Sun Mar 04 10:10:24 CST 2018
|
我们在提供者测试方法发送消息的时间为10:10:24
,而真正消费的时间则为10:10:34
,与我们预计的一样,消息延迟了10秒
后去执行消费。
总结
终上所述我们完成了消息队列的延迟消费
,采用死信
方式,通过消息过期方式触发,在实际项目研发过程中,延迟消费还是很有必要的,可以省去一些定时任务的配置。