消息队列RabbitMQ消息延时消费


2018-3-1SpringBoot官方发版了2.0.0.RELEASE最新版本,新版本完全基于Spring5.0来构建,JDK最低支持也从原来的1.6也改成了1.8,不再兼容1.8以下的版本,更多新特性请查看官方文档

本章目标

基于SpringBoot整合RabbitMQ完成消息延迟消费。

构建项目

注意前言

由于SpringBoot的内置扫描机制,我们如果不自动配置扫描路径,请保持下面rabbitmq-common模块内的配置可以被SpringBoot扫描到,否则不会自动创建队列,控制台会输出404的错误信息。

我们本章采用2.0.0.RELEASE版本的SpringBoot,添加相关的依赖如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
......
<dependencies>
<!--rabbbitMQ相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--web相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--lombok依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--spring boot tester-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--fast json依赖-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.40</version>
</dependency>
</dependencies>
......

我们仍然采用多模块的方式来测试队列的Provider以及Consumer

队列公共模块

我们先来创建一个名为rabbitmq-common公共依赖模块(Create New Maven Module)
在公共模块内添加一个QueueEnum队列枚举配置,该枚举内配置队列的ExchangeQueueNameRouteKey等相关内容,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.hengyu.rabbitmq.lazy.enums;

import lombok.Getter;

/**
* 消息队列枚举配置
*
* @author:于起宇 <br/>
* ===============================
* Created with IDEA.
* Date:2018/3/3
* Time:下午4:33
* 简书:http://www.jianshu.com/u/092df3f77bca
* ================================
*/
@Getter
public enum QueueEnum {
/**
* 消息通知队列
*/
MESSAGE_QUEUE("message.center.direct", "message.center.create", "message.center.create"),
/**
* 消息通知ttl队列
*/
MESSAGE_TTL_QUEUE("message.center.topic.ttl", "message.center.create.ttl", "message.center.create.ttl");
/**
* 交换名称
*/
private String exchange;
/**
* 队列名称
*/
private String name;
/**
* 路由键
*/
private String routeKey;

QueueEnum(String exchange, String name, String routeKey) {
this.exchange = exchange;
this.name = name;
this.routeKey = routeKey;
}
}

可以看到MESSAGE_QUEUE队列配置跟我们之前章节的配置一样,而我们另外新创建了一个后缀为ttl的消息队列配置。我们采用的这种方式是RabbitMQ消息队列其中一种的延迟消费模块,通过配置队列消息过期后转发的形式。

这种模式比较简单,我们需要将消息先发送到ttl延迟队列内,当消息到达过期时间后会自动转发到ttl队列内配置的转发Exchange以及RouteKey绑定的队列内完成消息消费。

下面我们来模拟消息通知的延迟消费场景,先来创建一个名为MessageRabbitMqConfiguration的队列配置类,该配置类内添加消息通知队列配置以及消息通过延迟队列配置,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* 消息通知 - 消息队列配置信息
*
* @author:恒宇少年 <br/>
* ===============================
* Created with IDEA.
* Date:2018/3/3
* Time:下午4:32
* 简书:http://www.jianshu.com/u/092df3f77bca
* ================================
*/
@Configuration
public class MessageRabbitMqConfiguration {
/**
* 消息中心实际消费队列交换配置
*
* @return
*/
@Bean
DirectExchange messageDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.MESSAGE_QUEUE.getExchange())
.durable(true)
.build();
}

/**
* 消息中心延迟消费交换配置
*
* @return
*/
@Bean
DirectExchange messageTtlDirect() {
return (DirectExchange) ExchangeBuilder
.directExchange(QueueEnum.MESSAGE_TTL_QUEUE.getExchange())
.durable(true)
.build();
}

/**
* 消息中心实际消费队列配置
*
* @return
*/
@Bean
public Queue messageQueue() {
return new Queue(QueueEnum.MESSAGE_QUEUE.getName());
}


/**
* 消息中心TTL队列
*
* @return
*/
@Bean
Queue messageTtlQueue() {
return QueueBuilder
.durable(QueueEnum.MESSAGE_TTL_QUEUE.getName())
// 配置到期后转发的交换
.withArgument("x-dead-letter-exchange", QueueEnum.MESSAGE_QUEUE.getExchange())
// 配置到期后转发的路由键
.withArgument("x-dead-letter-routing-key", QueueEnum.MESSAGE_QUEUE.getRouteKey())
.build();
}

/**
* 消息中心实际消息交换与队列绑定
*
* @param messageDirect 消息中心交换配置
* @param messageQueue 消息中心队列
* @return
*/
@Bean
Binding messageBinding(DirectExchange messageDirect, Queue messageQueue) {
return BindingBuilder
.bind(messageQueue)
.to(messageDirect)
.with(QueueEnum.MESSAGE_QUEUE.getRouteKey());
}

/**
* 消息中心TTL绑定实际消息中心实际消费交换机
*
* @param messageTtlQueue
* @param messageTtlDirect
* @return
*/
@Bean
public Binding messageTtlBinding(Queue messageTtlQueue, DirectExchange messageTtlDirect) {
return BindingBuilder
.bind(messageTtlQueue)
.to(messageTtlDirect)
.with(QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey());
}
}

我们声明了消息通知队列的相关ExchangeQueueBinding等配置,将message.center.create队列通过路由键message.center.create绑定到了message.center.direct交换上。

除此之外,我们还添加了消息通知延迟队列ExchangeQueueBinding等配置,将message.center.create.ttl队列通过message.center.create.ttl路由键绑定到了message.center.topic.ttl交换上。

我们仔细来看看messageTtlQueue延迟队列的配置,跟messageQueue队列配置不同的地方这里多出了x-dead-letter-exchangex-dead-letter-routing-key两个参数,而这两个参数就是配置延迟队列过期后转发的ExchangeRouteKey,只要在创建队列时对应添加了这两个参数,在RabbitMQ管理平台看到的队列配置就不仅是单纯的Direct类型的队列类型,如下图所示:
队列类型差异

在上图内我们可以看到message.center.create.ttl队列多出了DLXDLK的配置,这就是RabbitMQ死信交换的标志。
满足死信交换的条件,在官方文档中表示:

Messages from a queue can be ‘dead-lettered’; that is, republished to another exchange when any of the following events occur:

The message is rejected (basic.reject or basic.nack) with requeue=false,
The TTL for the message expires; or
The queue length limit is exceeded.

  • 该消息被拒绝(basic.reject或 basic.nack),requeue = false
  • 消息的TTL过期
  • 队列长度限制已超出
    官方文档地址

我们需要满足上面的其中一种方式就可以了,我们采用满足第二个条件,采用过期的方式。

队列消息提供者

我们再来创建一个名为rabbitmq-lazy-provider的模块(Create New Maven Module),并且在pom.xml配置文件内添加rabbitmq-common模块的依赖,如下所示:

1
2
3
4
5
6
<!--添加公共模块依赖-->
<dependency>
<groupId>com.hengyu</groupId>
<artifactId>rabbitmq-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>

配置队列

resource下创建一个名为application.yml的配置文件,在该配置文件内添加如下配置信息:

1
2
3
4
5
6
7
8
9
spring:
#rabbitmq消息队列配置信息
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /hengboy
publisher-confirms: true

消息提供者类

接下来我们来创建名为MessageProvider消息提供者类,用来发送消息内容到消息通知延迟队列,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 消息通知 - 提供者
*
* @author:于起宇 <br/>
* ===============================
* Created with IDEA.
* Date:2018/3/3
* Time:下午4:40
* 简书:http://www.jianshu.com/u/092df3f77bca
* ================================
*/
@Component
public class MessageProvider {
/**
* logger instance
*/
static Logger logger = LoggerFactory.getLogger(MessageProvider.class);
/**
* RabbitMQ 模版消息实现类
*/
@Autowired
private AmqpTemplate rabbitMqTemplate;

/**
* 发送延迟消息
*
* @param messageContent 消息内容
* @param exchange 队列交换
* @param routerKey 队列交换绑定的路由键
* @param delayTimes 延迟时长,单位:毫秒
*/
public void sendMessage(Object messageContent, String exchange, String routerKey, final long delayTimes) {
if (!StringUtils.isEmpty(exchange)) {
logger.info("延迟:{}毫秒写入消息队列:{},消息内容:{}", delayTimes, routerKey, JSON.toJSONString(messageContent));
// 执行发送消息到指定队列
rabbitMqTemplate.convertAndSend(exchange, routerKey, messageContent, message -> {
// 设置延迟毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
});
} else {
logger.error("未找到队列消息:{},所属的交换机", exchange);
}
}
}

由于我们在 pom.xml配置文件内添加了RabbitMQ相关的依赖并且在上面application.yml文件内添加了对应的配置,SpringBoot为我们自动实例化了AmqpTemplate,该实例可以发送任何类型的消息到指定队列。
我们采用convertAndSend 方法,将消息内容发送到指定ExchangeRouterKey队列,并且通过setExpiration方法设置过期时间,单位:毫秒。

编写发送测试

我们在test目录下创建一个测试类,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMqLazyProviderApplication.class)
public class RabbitMqLazyProviderApplicationTests {
/**
* 消息队列提供者
*/
@Autowired
private MessageProvider messageProvider;

/**
* 测试延迟消息消费
*/
@Test
public void testLazy() {
// 测试延迟10秒
messageProvider.sendMessage("测试延迟消费,写入时间:" + new Date(),
QueueEnum.MESSAGE_TTL_QUEUE.getExchange(),
QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey(),
10000);
}
}

注意:@SpringBootTest注解内添加了classes入口类的配置,因为我们是模块创建的项目并不是默认创建的SpringBoot项目,这里需要配置入口程序类才可以运行测试。

在测试类我们注入了MessageProvider 消息提供者,调用sendMessage方法发送消息到消息通知延迟队列,并且设置延迟的时间为10秒,这里衡量发送到指定队列的标准是要看MessageRabbitMqConfiguration配置类内的相关Binding配置,通过ExchangeRouterKey值进行发送到指定的队列。

到目前为止我们的rabbitmq-lazy-provider消息提供模块已经编写完成了,下面我们来看看消息消费者模块。

队列消息消费者

我们再来创建一个名为rabbitmq-lazy-consumer的模块(Create New Maven Module),同样需要在pom.xml配置文件内添加rabbitmq-common模块的依赖,如下所示:

1
2
3
4
5
6
<!--添加公共模块依赖-->
<dependency>
<groupId>com.hengyu</groupId>
<artifactId>rabbitmq-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>

当然同样需要在resource下创建application.yml并添加消息队列的相关配置,代码就不贴出来了,可以直接从rabbitmq-lazy-provider模块中复制application.yml文件到当前模块内。

消息消费者类

接下来创建一个名为MessageConsumer的消费者类,该类需要监听消息通知队列,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 消息通知 - 消费者
*
* @author:于起宇 <br/>
* ===============================
* Created with IDEA.
* Date:2018/3/3
* Time:下午5:00
* 简书:http://www.jianshu.com/u/092df3f77bca
* ================================
*/
@Component
@RabbitListener(queues = "message.center.create")
public class MessageConsumer {
/**
* logger instance
*/
static Logger logger = LoggerFactory.getLogger(MessageConsumer.class);

@RabbitHandler
public void handler(String content) {
logger.info("消费内容:{}", content);
}
}

@RabbitListener注解内配置了监听的队列,这里配置内容是QueueEnum枚举内的queueName属性值,当然如果你采用常量的方式在注解属性上是直接可以使用的,枚举不支持这种配置,这里只能把QueueName字符串配置到queues属性上了。
由于我们在消息发送时采用字符串的形式发送消息内容,这里在@RabbitHandler处理方法的参数内要保持数据类型一致!

消费者入口类

我们为消费者模块添加一个入口程序类,用于启动消费者,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 【第四十六章:SpringBoot & RabbitMQ完成消息延迟消费】
* 队列消费者模块 - 入口程序类
*
* @author:于起宇 <br/>
* ===============================
* Created with IDEA.
* Date:2018/3/3
* Time:下午4:55
* 简书:http://www.jianshu.com/u/092df3f77bca
* ================================
*/
@SpringBootApplication
public class RabbitMqLazyConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(RabbitMqLazyConsumerApplication.class, args);
}
}

测试

我们的代码已经编写完毕,下面来测试下是否完成了我们预想的效果,步骤如下所示:

1
2
3
1. 启动消费者模块
2. 执行RabbitMqLazyProviderApplicationTests.testLazy()方法进行发送消息到通知延迟队列
3. 查看消费者模块控制台输出内容

我们可以在消费者模块控制台看到输出内容:

1
2018-03-04 10:10:34.765  INFO 70486 --- [cTaskExecutor-1] c.h.r.lazy.consumer.MessageConsumer      : 消费内容:测试延迟消费,写入时间:Sun Mar 04 10:10:24 CST 2018

我们在提供者测试方法发送消息的时间为10:10:24,而真正消费的时间则为10:10:34,与我们预计的一样,消息延迟了10秒后去执行消费。

总结

终上所述我们完成了消息队列的延迟消费,采用死信方式,通过消息过期方式触发,在实际项目研发过程中,延迟消费还是很有必要的,可以省去一些定时任务的配置。

消息队列RabbitMQ消息延时消费

https://blog.minbox.org/rabbitmq-delay-consumer.html

作者

恒宇少年 - 于起宇

发布于

2019-09-29

更新于

2022-10-26

许可协议

评论