37.Spring Boot 消息队列 RocketMQ 入门
37.Spring Boot 消息队列 RocketMQ 入门
1. 概述
如果胖友还没了解过分布式消息队列 Apache RocketMQ ,建议先阅读下艿艿写的 《芋道 RocketMQ 极简入门》 文章。虽然这篇文章标题是安装部署,实际可以理解成《一文带你快速入门 RocketMQ》,哈哈哈。
考虑这是 RocketMQ 如何在 Spring Boot 整合与使用的文章,所以还是简单介绍下 RocketMQ 是什么?
RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
具有以下特点:
- 能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
ps: Metaq 3.0 版本改名,产品名称改为 RocketMQ
在本文中,我们会比 《芋道 RocketMQ 极简入门》 提供更多的生产者 Producer 和消费者 Consumer 的使用示例。例如说:
- Producer 三种发送消息的方式。
- Producer 发送顺序 消息,Consumer 顺序消费消息。
- Producer 发送定时消息。
- Producer 批量发送消息。
- Producer 发送事务消息。
- Consumer 广播 和集群消费消息。
胖友你就说,艿艿是不是很良心。😜
2. RocketMQ-Spring
RocketMQ-Spring 项目,RocketMQ 对 Spring 的集成支持。主要有两方面的功能:
- 功能一:支持 Spring Message 规范,方便开发者从其它 MQ 快速切换到 RocketMQ 。
- 功能二:帮助开发者在 Spring Boot 中快速集成 RocketMQ 。
我们先一起了解下功能一 。对于大多数国内的开发者,相信对 Spring Message 是比较陌生的,包括艿艿自己。所幸艿艿是一个专业的收藏家,无意中看到有篇文章介绍了 RocketMQ-Spring 在这块的设计上的想法:
FROM 《我用这种方法在 Spring 中实现消息的发送和消息》
Spring Messaging 是 Spring Framework 4 中添加的模块,是Spring 与消息系统集成的一个扩展性的支持。它实现了从基于 JmsTemplate 的简单的使用 JMS 接口到异步接收消息的一整套完整的基础架构,Spring AMQP 提供了该协议所要求的类似的功能集。在与 Spring Boot 的集成后,它拥有了自动配置能力,能够在测试和运行时与相应的消息传递系统进行集成。
单纯对于客户端而言,Spring Messaging 提供了一套抽象的 API 或者说是约定的标准,对消息发送端和消息接收端的模式进行规定,不同的消息中间件提供商可以在这个模式下提供自己的 Spring 实现:
- 在消息发送端,需要实现的是一个 XXXTemplate 形式的 Java Bean ,结合 Spring Boot 的自动化配置选项提供多个不同的发送消息方法;
- 在消息的消费端,是一个 XXXMessageListener 接口(实现方式通常会使用一个注解来声明一个消息驱动的 POJO ),提供回调方法来监听和消费消息,这个接口同样可以使用 Spring Boot 的自动化选项和一些定制化的属性。
如果有兴趣深入的了解 Spring Messaging 及针对不同的消息产品的使用,推荐阅读这个文件。参考 Spring Messaging 的既有实现, RocketMQ 的 spring-boot-starter 中遵循了相关的设计模式,并结合 RocketMQ 自身的功能特点提供了相应的 API(如,顺序,异步和事务半消息等)。
这样一撸,是不是清晰多了。简单来说,RocketMQ-Spring 就是基于 Spring Message 来实现 RocketMQ 的发送端和接收端。
我们再一起了解下功能二 。比较好理解,就是提供了 RocketMQ 的 spring-boot-starter 功能,实现 RocketMQ 的自动化配置。
不过,这里艿艿还是想弱弱吐槽一句,RocketMQ 的官方 spring-boot-starter 真的有点出的太晚了。如下是整理的时间轴:
- 2014-08 Spring Boot 1 正式发布。
- 2018-03 Spring Boot 2 正式发布。
- 2018-12 RocketMQ 团队发布 RocketMQ 集成到 Spring Boot 的解决方案,并且提供了中文文档。
3. 快速入门
示例代码对应仓库:lab-31-rocketmq-demo 。
本小节,我们先来对 RocketMQ-Spring 做一个快速入门,实现 Producer 三种发送消息的方式的功能,同时创建一个 Consumer 消费消息。
考虑到一个应用既可以使用生产者 Producer ,又可以使用消费者 Consumer ,所以示例就做成一个 lab-31-rocketmq-demo 项目。
3.1 引入依赖
在 pom.xml
文件中,引入相关依赖。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>lab-31-rocketmq-demo</artifactId>
<dependencies>
<!-- 实现对 RocketMQ 的自动化配置 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
<!-- 方便等会写单元测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
3.2 应用配置文件
在 resources
目录下,创建 application.yaml
配置文件。配置如下:
# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv
# Producer 配置项
producer:
group: demo-producer-group # 生产者分组
send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
secret-key: # Secret Key
enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。
# Consumer 配置项
consumer:
listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。
test-consumer-group:
topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费
- 在
rocketmq
配置项,设置 RocketMQ 的配置,对应 RocketMQProperties 配置类。 - RocketMQ-Spring RocketMQAutoConfiguration 自动化配置类,实现 RocketMQ 的自动配置,创建相应的 Producer 和 Consumer 。
rocketmq.name-server
配置项,设置 RocketMQ Namesrv 地址。如果多个,使用逗号分隔。rocketmq.producer
配置项,一看就知道是 RocketMQ Producer 所独有。group
配置,生产者分组。retry-next-server
配置,发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为false
。如果胖友使用多主 Broker 的情况下,需要设置true
,这样才会在发送消息失败时,重试另外一台 Broker 。- 其它配置,一般默认即可。
rocketmq.consumer
配置项,一看就知道是 RocketMQ Consumer 所独有。listener
配置,配置某个消费分组,是否监听指定 Topic 。结构为Map<消费者分组, <Topic, Boolean>>
。默认情况下,不配置表示监听 。一般情况下,只有我们在想不监听消费某个消费分组的某个 Topic 时,才需要配listener
配置。
3.3 Application
创建 Application.java
类,配置 @SpringBootApplication
注解即可。代码如下:
// Application.java
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
3.5 Demo01Message
在 cn.iocoder.springboot.lab31.rocketmqdemo.message
包下,创建 Demo01Message 消息类,提供给当前示例使用。代码如下:
// Demo01Message.java
public class Demo01Message {
public static final String TOPIC = "DEMO_01";
/**
* 编号
*/
private Integer id;
// ... 省略 set/get/toString 方法
}
TOPIC
静态属性,我们设置该消息类对应 Topic 为"DEMO_01"
。
3.6 Demo01Producer
在 cn.iocoder.springboot.lab31.rocketmqdemo.producer
包下,创建 Demo01Producer 类,它会使用 RocketMQ-Spring 封装提供的 RocketMQTemplate ,实现三种发送消息的方式。代码如下:
// Demo01Producer.java
@Component
public class Demo01Producer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult syncSend(Integer id) {
// 创建 Demo01Message 消息
Demo01Message message = new Demo01Message();
message.setId(id);
// 同步发送消息
return rocketMQTemplate.syncSend(Demo01Message.TOPIC, message);
}
public void asyncSend(Integer id, SendCallback callback) {
// 创建 Demo01Message 消息
Demo01Message message = new Demo01Message();
message.setId(id);
// 异步发送消息
rocketMQTemplate.asyncSend(Demo01Message.TOPIC, message, callback);
}
public void onewaySend(Integer id) {
// 创建 Demo01Message 消息
Demo01Message message = new Demo01Message();
message.setId(id);
// oneway 发送消息
rocketMQTemplate.sendOneWay(Demo01Message.TOPIC, message);
}
}
- 三个方法,对应三个 RocketMQ 发送消息的方式,分别调用 RocketMQTemplate 提供的
#syncSend(...)
和#asyncSend(...)
以及#sendOneWay(...)
方法。
我们来简单聊下 RocketMQTemplate 类,它继承 Spring Messaging 定义的 AbstractMessageSendingTemplate 抽象类,以达到融入 Spring Messaging 体系中。
在 RocketMQTemplate 中,会创建一个 RocketMQ DefaultMQProducer 生产者 producer
,所以 RocketMQTemplate 后续的各种发送消息的方法,都是使用它。😈 当然,因为 RocketMQTemplate 的封装,所以我们可以像使用 Spring Messaging 一样的方式,进行消息的发送,而无需直接使用 RocketMQ 提供的 Producer 发送消息。
对于胖友来说,可能最关心的是,消息 Message 是怎么序列化的。我们来看看 RocketMQUtil#convertToRocketMessage(...) 方法的代码:
// RocketMQTemplate.java
public SendResult syncSend(String destination, Object payload, long timeout) {
Message<?> message = MessageBuilder.withPayload(payload).build(); // <X>
// ... 省略其它代码
}
// RocketMQUti.java
public static org.apache.rocketmq.common.message.Message convertToRocketMessage(
MessageConverter messageConverter, String charset,
String destination, org.springframework.messaging.Message<?> message) {
Object payloadObj = message.getPayload();
byte[] payloads;
try {
if (null == payloadObj) {
throw new RuntimeException("the message cannot be empty");
}
// 如果是 String 类型,则直接获得其 byte[] 内容。
if (payloadObj instanceof String) {
payloads = ((String)payloadObj).getBytes(Charset.forName(charset));
// 如果是 byte[] 类型,则直接使用即可
} else if (payloadObj instanceof byte[]) {
payloads = (byte[])message.getPayload();
// 如果是复杂对象类型,则使用 MessageConverter 进行转换成字符串,然后再获得字符串的 byte[] 内容。
} else {
String jsonObj = (String)messageConverter.fromMessage(message, payloadObj.getClass());
if (null == jsonObj) {
throw new RuntimeException(String.format(
"empty after conversion [messageConverter:%s,payloadClass:%s,payloadObj:%s]",
messageConverter.getClass(), payloadObj.getClass(), payloadObj));
}
payloads = jsonObj.getBytes(Charset.forName(charset));
}
} catch (Exception e) {
throw new RuntimeException("convert to RocketMQ message failed.", e);
}
// 转换成 RocketMQ Message
return getAndWrapMessage(destination, message.getHeaders(), payloads);
}
- 在
<X>
处,RocketMQTemplate 会通过 Spring Messaging 的 MessageBuilder 将我们传入的消息payload
转换成 Spring Messaging 的 Message 消息对象。 RocketMQUtil#convertToRocketMessage(...)
的代码,胖友自己看下艿艿添加的注释,进行下理解。因为我们一般消息都是复杂对象 类型,所以会采用 MessageConverter 进行转换。RocketMQ-Spring 的默认使用 MappingJackson2MessageConverter 或 MappingFastJsonMessageConverter ,即使用 JSON 格式序列化和反序列化 Message 消息内容。为什么是这两个 MessageConverter ,胖友可以自己看看 RocketMQ-Spring 的 MessageConverterConfiguration 配置类。
3.7 Demo01Consumer
在 cn.iocoder.springboot.lab31.rocketmqdemo.consumer
包下,创建 Demo01Consumer 类,实现 Rocket-Spring 定义的 RocketMQListener 接口,消费消息。代码如下:
// Demo01Consumer.java
@Component
@RocketMQMessageListener(
topic = Demo01Message.TOPIC,
consumerGroup = "demo01-consumer-group-" + Demo01Message.TOPIC
)
public class Demo01Consumer implements RocketMQListener<Demo01Message> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
- 在类上,添加了
@RocketMQMessageListener
注解,声明消费的 Topic 是"DEMO_01"
,消费者分组是"demo01-consumer-group-DEMO_01"
。一般情况下,我们建议一个消费者分组,仅消费一个 Topic 。这样做会有两个好处:- 每个消费者分组职责单一,只消费一个 Topic 。
- 每个消费者分组是独占一个线程池,这样能够保证多个 Topic 隔离在不同线程池,保证隔离性,从而避免一个 Topic 消费很慢,影响到另外的 Topic 的消费。
- 实现 RocketMQListener 接口,在
T
泛型里,设置消费的消息对应的类。此处,我们就设置了 Demo01Message 类。
3.9 Demo01AConsumer
在 cn.iocoder.springboot.lab31.rocketmqdemo.consumer
包下,创建 Demo01AConsumer 类,实现 Rocket-Spring 定义的 RocketMQListener 接口,消费消息。代码如下:
@Component
@RocketMQMessageListener(
topic = Demo01Message.TOPIC,
consumerGroup = "demo01-A-consumer-group-" + Demo01Message.TOPIC
)
public class Demo01AConsumer implements RocketMQListener<MessageExt> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(MessageExt message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
- 整体和 「3.8 Demo01Consumer」 是一致的,主要有两个差异点,也是为什么我们又额外创建了这个消费者的原因。
差异一 ,在类上,添加了 @RocketMQMessageListener
注解,声明消费的 Topic 还是 "DEMO_01"
,消费者分组修改成 了 "demo01-A-consumer-group-DEMO_01"
。这样,我们就可以测试 RocketMQ 集群消费的特性。
集群消费(Clustering):集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。
- 也就是说,如果我们发送一条 Topic 为
"DEMO_01"
的消息,可以分别被"demo01-A-consumer-group-DEMO_01"
和"demo01-consumer-group-DEMO_01"
都消费一次。 - 但是,如果我们启动两个该示例的实例,则消费者分组
"demo01-A-consumer-group-DEMO_01"
和"demo01-consumer-group-DEMO_01"
都会有多个 Consumer 示例。此时,我们再发送一条 Topic 为"DEMO_01"
的消息,只会被"demo01-A-consumer-group-DEMO_01"
的一个 Consumer 消费一次,也同样只会被"demo01-consumer-group-DEMO_01"
的一个 Consumer 消费一次。
好好理解上述的两段话,非常重要。
通过集群消费 的机制,我们可以实现针对相同 Topic ,不同消费者分组实现各自的业务逻辑。例如说:用户注册成功时,发送一条 Topic 为 "USER_REGISTER"
的消息。然后,不同模块使用不同的消费者分组,订阅该 Topic ,实现各自的拓展逻辑:
- 积分模块:判断如果是手机注册,给用户增加 20 积分。
- 优惠劵模块:因为是新用户,所以发放新用户专享优惠劵。
- 站内信模块:因为是新用户,所以发送新用户的欢迎语的站内信。
- ... 等等
这样,我们就可以将注册成功后的业务拓展逻辑,实现业务上的解耦,未来也更加容易拓展。同时,也提高了注册接口的性能,避免用户需要等待业务拓展逻辑执行完成后,才响应注册成功。
差异二 ,实现 RocketMQListener 接口,在 T
泛型里,设置消费的消息对应的类不是 Demo01Message 类,而是 RocketMQ 内置的 MessageExt 类。通过 MessageExt 类,我们可以获取到消费的消息的更多信息,例如说消息的所属队列、创建时间等等属性,不过消息的内容(body
)就需要自己去反序列化。当然,一般情况下,我们不会使用 MessageExt 类。
3.10 简单测试
创建 Demo01ProducerTest 测试类,编写三个单元测试方法,调用 Demo01Producer 三种发送消息的方式。代码如下:
// Demo01ProducerTest.java
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo01ProducerTest {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private Demo01Producer producer;
@Test
public void testSyncSend() throws InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
SendResult result = producer.syncSend(id);
logger.info("[testSyncSend][发送编号:[{}] 发送结果:[{}]]", id, result);
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
@Test
public void testASyncSend() throws InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
producer.asyncSend(id, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
logger.info("[testASyncSend][发送编号:[{}] 发送成功,结果为:[{}]]", id, result);
}
@Override
public void onException(Throwable e) {
logger.info("[testASyncSend][发送编号:[{}] 发送异常]]", id, e);
}
});
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
@Test
public void testOnewaySend() throws InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
producer.onewaySend(id);
logger.info("[testOnewaySend][发送编号:[{}] 发送完成]", id);
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
}
我们来执行 #testSyncSend()
方法,测试同步发送消息。控制台输出如下:
# Producer 同步发送消息成功
2019-12-05 13:48:57.342 INFO 79342 --- [ main] c.i.s.l.r.producer.Demo01ProducerTest : [testSyncSend][发送编号:[1575438537] 发送结果:[SendResult [sendStatus=SEND_OK, msgId=C0A8032C35EE18B4AAC2126A02770000, offsetMsgId=C0A8032C00002A9F000000000010E628, messageQueue=MessageQueue [topic=DEMO_01, brokerName=broker-a, queueId=0], queueOffset=255]]]
# Demo01AConsumer 消费了一次该消息
2019-12-05 13:48:57.347 INFO 79342 --- [MessageThread_1] c.i.s.l.r.consumer.Demo01AConsumer : [onMessage][线程编号:45 消息内容:MessageExt [queueId=0, storeSize=284, queueOffset=255, sysFlag=0, bornTimestamp=1575438537338, bornHost=/192.168.3.44:57823, storeTimestamp=1575438537340, storeHost=/192.168.3.44:10911, msgId=C0A8032C00002A9F000000000010E628, commitLogOffset=1107496, bodyCRC=1962202087, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='DEMO_01', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=256, CONSUME_START_TIME=1575438537347, id=b0e72a1c-cb11-5152-7d0d-c034b118a3e5, UNIQ_KEY=C0A8032C35EE18B4AAC2126A02770000, CLUSTER=DefaultCluster, WAIT=false, contentType=application/json, timestamp=1575438537333}, body=[123, 34, 105, 100, 34, 58, 49, 53, 55, 53, 52, 51, 56, 53, 51, 55, 125], transactionId='null'}]]
# Demo01Consumer 消费了一次该消息
2019-12-05 13:49:00.150 INFO 79342 --- [MessageThread_1] c.i.s.l.r.consumer.Demo01Consumer : [onMessage][线程编号:51 消息内容:Demo01Message{id=1575438537}]
- 通过日志我们可以看到,我们发送的消息,分别被 Demo01AConsumer 和 Demo01Consumer 两个消费者(消费者分组)都消费了一次。
- 同时,两个消费者在不同的线程池中,消费了这条消息。虽然说,我们看到两条日志里,我们都看到了线程名为
"MessageThread_1"
,但是线程编号分别是 45 和 51 。😈 因为,每个 RocketMQ Consumer 的消费线程池创建的线程都是以"MessageThread_"
开头,同时这里相同的线程名结果不同的线程编号,很容易判断出时候用了两个不同的消费线程池。
我们来执行 #testASyncSend()
方法,测试异步发送消息。控制台输出如下:
友情提示:注意,不要关闭
#testSyncSend()
单元测试方法,因为我们要模拟每个消费者集群,都有多个 Consumer 节点。
// Producer 异步发送消息成功
2019-12-05 13:56:34.366 INFO 79642 --- [ublicExecutor_4] c.i.s.l.r.producer.Demo01ProducerTest : [testASyncSend][发送编号:[1575438994] 发送成功,结果为:[SendResult [sendStatus=SEND_OK, msgId=C0A8032C371A18B4AAC21270FBB70000, offsetMsgId=C0A8032C00002A9F000000000010E8CA, messageQueue=MessageQueue [topic=DEMO_01, brokerName=broker-a, queueId=3], queueOffset=256]]]
# Demo01AConsumer 消费了一次该消息
2019-12-05 13:56:34.370 INFO 79642 --- [MessageThread_1] c.i.s.l.r.consumer.Demo01AConsumer : [onMessage][线程编号:47 消息内容:MessageExt [queueId=3, storeSize=284, queueOffset=256, sysFlag=0, bornTimestamp=1575438994361, bornHost=/192.168.3.44:57926, storeTimestamp=1575438994364, storeHost=/192.168.3.44:10911, msgId=C0A8032C00002A9F000000000010E8CA, commitLogOffset=1108170, bodyCRC=412662346, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='DEMO_01', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=257, CONSUME_START_TIME=1575438994370, id=80b9f381-febe-6cda-02e7-43bf8f8a5c8a, UNIQ_KEY=C0A8032C371A18B4AAC21270FBB70000, CLUSTER=DefaultCluster, WAIT=false, contentType=application/json, timestamp=1575438994356}, body=[123, 34, 105, 100, 34, 58, 49, 53, 55, 53, 52, 51, 56, 57, 57, 52, 125], transactionId='null'}]]
# Demo01Consumer 消费了一次该消息
2019-12-05 13:56:34.402 INFO 79642 --- [MessageThread_1] c.i.s.l.r.consumer.Demo01Consumer : [onMessage][线程编号:46 消息内容:Demo01Message{id=1575438994}]
- 和
#testSyncSend()
方法执行的结果,是一致的。此时,我们打开#testSyncSend()
方法所在的控制台,不会看到有消息消费的日志。说明,符合集群消费的机制:集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。。 - 😈 不过如上的日志,也可能出现在
#testSyncSend()
方法所在的控制台,而不在#testASyncSend()
方法所在的控制台。
3.11 @RocketMQMessageListener
在 「3.8 Demo01Consumer」 中,我们已经使用了 @RocketMQMessageListener
注解,设置每个 RocketMQ 消费者 Consumer 的消息监听器的配置。
@RocketMQMessageListener
注解的常用属性如下:
/**
* Consumer 所属消费者分组
*
* Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
* load balance. It's required and needs to be globally unique.
*
* See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
*/
String consumerGroup();
/**
* 消费的 Topic
*
* Topic name.
*/
String topic();
/**
* 选择器类型。默认基于 Message 的 Tag 选择。
*
* Control how to selector message.
*
* @see SelectorType
*/
SelectorType selectorType() default SelectorType.TAG;
/**
* 选择器的表达式。
* 设置为 * 时,表示全部。
*
* 如果使用 SelectorType.TAG 类型,则设置消费 Message 的具体 Tag 。
* 如果使用 SelectorType.SQL92 类型,可见 https://rocketmq.apache.org/rocketmq/filter-messages-by-sql92-in-rocketmq/ 文档
*
* Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}
*/
String selectorExpression() default "*";
/**
* 消费模式。可选择并发消费,还是顺序消费。
*
* Control consume mode, you can choice receive message concurrently or orderly.
*/
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
/**
* 消息模型。可选择是集群消费,还是广播消费。
*
* Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
*/
MessageModel messageModel() default MessageModel.CLUSTERING;
/**
* 消费的线程池的最大线程数
*
* Max consumer thread number.
*/
int consumeThreadMax() default 64;
/**
* 消费单条消息的超时时间
*
* Max consumer timeout, default 30s.
*/
long consumeTimeout() default 30000L;
@RocketMQMessageListener
注解的不常用属性如下:
// 默认从配置文件读取的占位符
String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
/**
* The property of "access-key".
*/
String accessKey() default ACCESS_KEY_PLACEHOLDER;
/**
* The property of "secret-key".
*/
String secretKey() default SECRET_KEY_PLACEHOLDER;
/**
* Switch flag instance for message trace.
*/
boolean enableMsgTrace() default true;
/**
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
/**
* Consumer 连接的 RocketMQ Namesrv 地址。默认情况下,使用 `rocketmq.name-server` 配置项即可。
*
* 如果一个项目中,Consumer 需要使用不同的 RocketMQ Namesrv ,则需要配置该属性。
*
* The property of "name-server".
*/
String nameServer() default NAME_SERVER_PLACEHOLDER;
/**
* 访问通道。目前有 LOCAL 和 CLOUD 两种通道。
*
* LOCAL ,指的是本地部署的 RocketMQ 开源项目。
* CLOUD ,指的是阿里云的 ONS 服务。具体可见 https://help.aliyun.com/document_detail/128585.html 文档。
*
* The property of "access-channel".
*/
String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
3.12 @ExtRocketMQTemplateConfiguration
RocketMQ-Spring 考虑到开发者可能需要连接多个不同的 RocketMQ 集群,所以提供了 @ExtRocketMQTemplateConfiguration
注解,实现配置连接不同 RocketMQ 集群的 Producer 的 RocketMQTemplate Bean 对象。
@ExtRocketMQTemplateConfiguration
注解的具体属性,和我们在 「3.2 应用配置文件」 的 rocketmq.producer
配置项是一致的,就不重复赘述啦。
@ExtRocketMQTemplateConfiguration
注解的简单使用示例,代码如下:
@ExtRocketMQTemplateConfiguration(nameServer = "${demo.rocketmq.extNameServer:demo.rocketmq.name-server}")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
- 在类上,添加
@ExtRocketMQTemplateConfiguration
注解,并设置连接的 RocketMQ Namesrv 地址。 - 同时,需要继承 RocketMQTemplate 类,从而使我们可以直接使用
@Autowire
或@Resource
注解,注入 RocketMQTemplate Bean 属性。
4. 批量发送消息
示例代码对应仓库:lab-31-rocketmq-demo 。
在一些业务场景下,我们希望使用 Producer 批量发送消息,提高发送性能。在 RocketMQTemplate 中,提供了一个方法方法批量发送消息的方法。代码如下:
// RocketMQTemplate.java
public <T extends Message> SendResult syncSend(String destination, Collection<T> messages, long timeout) {
// ... 省略具体代码实现
}
- 通过方法参数
destination
可知,必须发送相同 Topic 的消息。 - 要注意方法参数
messages
,每个集合的元素必须是 Spring Messaging 定义的 Message 消息。😈 RocketMQTemplate 重载了非常多的#syncSend(...)
方法,一定要小心哟。 - 通过方法名可知,这个是同步批量发送消息。
有一点要注意,虽然是批量发送多条消息,但是是以所有消息加起来的大小,不能超过消息的最大大小的限制,而不是按照单条计算。😈 所以,一次性发送的消息特别多,还是需要分批的进行批量发送。
下面,我们开始本小节的示例。后续的小节,如果非必要的说明,我们都直接在 lab-31-rocketmq-demo 项目中,进行示例的增加。
4.1 Demo02Message
在 cn.iocoder.springboot.lab31.rocketmqdemo.message
包下,创建 Demo02Message 消息类,提供给当前示例使用。代码如下:
// Demo02Message.java
public class Demo02Message {
public static final String TOPIC = "DEMO_02";
/**
* 编号
*/
private Integer id;
// ... 省略 set/get/toString 方法
}
TOPIC
静态属性,我们设置该消息类对应 Topic 为"DEMO_02"
。- 其它都和 「3.5 Demo01Message」 是一样的。重新申明的原因是,避免污染 「3. 快速入门」 。😈 后续,每个小节的内容,我们也会通过创建新的 Message 类,保证多个示例之间的独立。
4.2 Demo02Producer
在 cn.iocoder.springboot.lab31.rocketmqdemo.producer
包下,创建 Demo02Producer 类,它会使用 RocketMQTemplate 实现批量发送消息。代码如下:
// Demo02Producer.java
@Component
public class Demo02Producer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult sendBatch(Collection<Integer> ids) {
// <X> 创建多条 Demo02Message 消息
List<Message> messages = new ArrayList<>(ids.size());
for (Integer id : ids) {
// 创建 Demo02Message 消息
Demo02Message message = new Demo02Message().setId(id);
// 构建 Spring Messaging 定义的 Message 消息
messages.add(MessageBuilder.withPayload(message).build());
}
// 同步批量发送消息
return rocketMQTemplate.syncSend(Demo02Message.TOPIC, messages, 30 * 1000L);
}
}
- 注意,在
<X>
处,我们就创建了 Spring Messaging 定义的 Message 消息的数组,用于下面使用 RocketMQTemplate 批量发送消息。
4.3 Demo02Consumer
在 cn.iocoder.springboot.lab31.rocketmqdemo.consumer
包下,创建 Demo02Consumer 类,实现 Rocket-Spring 定义的 RocketMQListener 接口,消费消息。代码如下:
// Demo02Consumer.java
@Component
@RocketMQMessageListener(
topic = Demo02Message.TOPIC,
consumerGroup = "demo02-consumer-group-" + Demo02Message.TOPIC
)
public class Demo02Consumer implements RocketMQListener<Demo02Message> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(Demo02Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
- 虽然说,Demo02Message 消息是批量发送的,但是我们还是可以和 「3.8 Demo1Consumer」 一样,逐条消费消息。
4.4 简单测试
创建 Demo02ProducerTest 测试类,编写一个单元测试方法,调用 Demo02Producer 批量发送消息。代码如下:
// Demo02ProducerTest.java
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo02ProducerTest {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private Demo02Producer producer;
@Test
public void testSendBatch() throws InterruptedException {
List<Integer> ids = Arrays.asList(1, 2, 3);
SendResult result = producer.sendBatch(ids);
logger.info("[testSendBatch][发送编号:[{}] 发送结果:[{}]]", ids, result);
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
}
我们来执行 #testSendBatch()
方法,测试批量发送消息。控制台输出如下:
# Producer 批量发送三条消息成功
2019-12-05 15:04:50.173 INFO 82497 --- [ main] c.i.s.l.r.producer.Demo02ProducerTest : [testSendBatch][发送编号:[[1, 2, 3]] 发送结果:[SendResult [sendStatus=SEND_OK, msgId=C0A8032C424118B4AAC212AF7AF60000,C0A8032C424118B4AAC212AF7AF60001,C0A8032C424118B4AAC212AF7AF60002, offsetMsgId=C0A8032C00002A9F000000000011150C,C0A8032C00002A9F0000000000111608,C0A8032C00002A9F0000000000111704, messageQueue=MessageQueue [topic=DEMO_02, brokerName=broker-a, queueId=0], queueOffset=1]]]
# 三条消息,被逐条消费
2019-12-05 15:04:52.979 INFO 82497 --- [MessageThread_6] c.i.s.l.r.consumer.Demo02Consumer : [onMessage][线程编号:61 消息内容:Demo01Message{id=3}]
2019-12-05 15:04:52.979 INFO 82497 --- [MessageThread_1] c.i.s.l.r.consumer.Demo02Consumer : [onMessage][线程编号:56 消息内容:Demo01Message{id=1}]
2019-12-05 15:04:52.979 INFO 82497 --- [MessageThread_3] c.i.s.l.r.consumer.Demo02Consumer : [onMessage][线程编号:59 消息内容:Demo01Message{id=2}]
- 😈 我们可以看到三条消息,被 Demo02Consumer 并发消费完成。
5. 定时消息
示例代码对应仓库:lab-31-rocketmq-demo 。
在 RocketMQ 中,提供定时消息的功能。
定时消息,是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。
不过,RocketMQ 暂时不支持任意的时间精度的延迟,而是固化了 18 个延迟级别。如下表格:
延迟级别 | 时间 | 延迟级别 | 时间 | 延迟级别 | 时间 |
---|---|---|---|---|---|
1 | 1s | 7 | 3m | 13 | 9m |
2 | 5s | 8 | 4m | 14 | 10m |
3 | 10s | 9 | 5m | 15 | 20m |
4 | 30s | 10 | 6m | 16 | 30m |
5 | 1m | 11 | 7m | 17 | 1h |
6 | 2m | 12 | 8m | 18 | 2h |
如果胖友想要任一时刻的定时消息,可以考虑借助 MySQL + Job 来实现。又或者考虑使用 DDMQ(滴滴打车基于 RocketMQ 和 Kafka 改造的开源消息队列) 。
下面,我们开始本小节的示例。
5.1 Demo03Message
在 cn.iocoder.springboot.lab31.rocketmqdemo.message
包下,创建 Demo03Message 消息类,提供给当前示例使用。代码如下:
public class Demo03Message {
public static final String TOPIC = "DEMO_03";
/**
* 编号
*/
private Integer id;
// ... 省略 set/get/toString 方法
}
TOPIC
静态属性,我们设置该消息类对应 Topic 为"DEMO_03"
。
5.2 Demo03Producer
在 cn.iocoder.springboot.lab31.rocketmqdemo.producer
包下,创建 Demo03Producer 类,它会使用 RocketMQTemplate 实现发送定时消息。代码如下:
// Demo03Producer.java
@Component
public class Demo03Producer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult syncSendDelay(Integer id, int delayLevel) {
// 创建 Demo03Message 消息
Message message = MessageBuilder.withPayload(new Demo03Message().setId(id))
.build();
// 同步发送消息
return rocketMQTemplate.syncSend(Demo03Message.TOPIC, message, 30 * 1000,
delayLevel);
}
public void asyncSendDelay(Integer id, int delayLevel, SendCallback callback) {
// 创建 Demo03Message 消息
Message message = MessageBuilder.withPayload(new Demo03Message().setId(id))
.build();
// 同步发送消息
rocketMQTemplate.asyncSend(Demo03Message.TOPIC, message, callback, 30 * 1000,
delayLevel);
}
}
- 定时消息,目前只支持同步 和异步发送定时消息。
5.3 Demo03Consumer
在 cn.iocoder.springboot.lab31.rocketmqdemo.consumer
包下,创建 Demo03Consumer 类,实现 Rocket-Spring 定义的 RocketMQListener 接口,消费消息。代码如下:
// Demo03Consumer.java
@Component
@RocketMQMessageListener(
topic = Demo03Message.TOPIC,
consumerGroup = "demo03-consumer-group-" + Demo03Message.TOPIC
)
public class Demo03Consumer implements RocketMQListener<Demo03Message> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(Demo03Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
- 和 「3.8 Demo1Consumer」 是一样,还是逐条消费消息。
5.4 简单测试
创建 Demo03ProducerTest 测试类,编写一个单元测试方法,调用 Demo03Producer 发送定时消息。代码如下:
// Demo03ProducerTest.java
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo03ProducerTest {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private Demo03Producer producer;
@Test
public void testSyncSendDelay() throws InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
SendResult result = producer.syncSendDelay(id, 3); // 延迟级别 3 ,即 10 秒后消费
logger.info("[testSyncSendDelay][发送编号:[{}] 发送结果:[{}]]", id, result);
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
}
我们来执行 #testSyncSendDelay()
方法,测试发送定时消息。控制台输出如下:
# Producer 发送定时消息成功
2019-12-05 15:53:27.222 INFO 85492 --- [ main] c.i.s.l.r.producer.Demo03ProducerTest : [testSyncSendDelay][发送编号:[1575446007] 发送结果:[SendResult [sendStatus=SEND_OK, msgId=C0A8032C4DF418B4AAC212DBFDB00006, offsetMsgId=C0A8032C00002A9F00000000001155C2, messageQueue=MessageQueue [topic=DEMO_03, brokerName=broker-a, queueId=0], queueOffset=5]]]
# 因为该消息的延迟级别是 3 ,所以 10 秒后被 Demo03Consumer 消费到
2019-12-05 15:53:37.226 INFO 85492 --- [MessageThread_1] c.i.s.l.r.consumer.Demo03Consumer : [onMessage][线程编号:60 消息内容:Demo03Message{id=1575446007}]
- 发送的消息,延迟 10 秒被 Demo03Consumer 消费。
6. 消费重试
示例代码对应仓库:lab-31-rocketmq-demo 。
RocketMQ 提供消费重试 的机制。在消息消费失败 的时候,RocketMQ 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。
当然,RocketMQ 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 16 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列。
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
RocketMQ 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在 RocketMQ 中,可以通过使用 console 控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
每条消息的失败重试,是有一定的间隔时间。实际上,消费重试是基于「5. 定时消息」 来实现,第一次重试消费按照延迟级别为 3 开始。😈 所以,默认为 16 次重试消费,也非常好理解,毕竟延迟级别最高为 18 呀。
不过要注意,只有集群消费模式下,才有消息重试。
下面,我们开始本小节的示例。
6.1 Demo04Message
在 cn.iocoder.springboot.lab31.rocketmqdemo.message
包下,创建 Demo04Message 消息类,提供给当前示例使用。代码如下:
public class Demo04Message {
public static final String TOPIC = "DEMO_04";
/**
* 编号
*/
private Integer id;
// ... 省略 set/get/toString 方法
}
TOPIC
静态属性,我们设置该消息类对应 Topic 为"DEMO_04"
。
6.2 Demo04Producer
在 cn.iocoder.springboot.lab31.rocketmqdemo.producer
包下,创建 Demo04Producer 类,它会使用 RocketMQ-Spring 封装提供的 RocketMQTemplate ,实现同步发送消息。代码如下:
// Demo04Producer.java
@Component
public class Demo04Producer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult syncSend(Integer id) {
// 创建 Demo04Message 消息
Demo04Message message = new Demo04Message();
message.setId(id);
// 同步发送消息
return rocketMQTemplate.syncSend(Demo04Message.TOPIC, message);
}
}
- 代码上,并没有什么特别,和 「3.6 Demo01Producer」 的同步发送消息的代码是一致的,除了消息换成了 Demo04Message 。
6.3 Demo04Consumer
在 cn.iocoder.springboot.lab31.rocketmqdemo.consumer
包下,创建 Demo04Consumer 类,实现 Rocket-Spring 定义的 RocketMQListener 接口,消费消息。代码如下:
// Demo04Consumer.java
@Component
@RocketMQMessageListener(
topic = Demo04Message.TOPIC,
consumerGroup = "demo04-consumer-group-" + Demo04Message.TOPIC
)
public class Demo04Consumer implements RocketMQListener<Demo04Message> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(Demo04Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
// <X> 注意,此处抛出一个 RuntimeException 异常,模拟消费失败
throw new RuntimeException("我就是故意抛出一个异常");
}
}
- 在
<X>
处,我们在消费消息时候,抛出一个 RuntimeException 异常,模拟消费失败。
6.4 简单测试
创建 Demo04ProducerTest 测试类,编写一个单元测试方法,调用 Demo04Producer 同步发送消息。代码如下:
// Demo04ProducerTest.java
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo04ProducerTest {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private Demo04Producer producer;
@Test
public void testSyncSend() throws InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
SendResult result = producer.syncSend(id);
logger.info("[testSyncSend][发送编号:[{}] 发送结果:[{}]]", id, result);
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
}
我们来执行 #testSyncSend()
方法,同步发送消息。控制台输出如下:
# Producer 同步发送消息成功
2019-12-05 16:42:00.603 INFO 87651 --- [ main] c.i.s.l.r.producer.Demo04ProducerTest : [testSyncSend][发送编号:[1575448920] 发送结果:[SendResult [sendStatus=SEND_OK, msgId=C0A8032C566318B4AAC2130872110000, offsetMsgId=C0A8032C00002A9F0000000000122185, messageQueue=MessageQueue [topic=DEMO_04, brokerName=broker-a, queueId=1], queueOffset=0]]]
# Demo04Consumer 第一次消费失败,抛出 RuntimeException 异常
2019-12-05 16:42:23.497 INFO 87651 --- [MessageThread_1] c.i.s.l.r.consumer.Demo04Consumer : [onMessage][线程编号:57 消息内容:Demo04Message{id=1575448920}]
2019-12-05 16:42:23.501 WARN 87651 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume message failed. messageExt:MessageExt [queueId=1, storeSize=284, queueOffset=0, sysFlag=0, bornTimestamp=1575448920596, bornHost=/192.168.3.44:62472, storeTimestamp=1575448920601, storeHost=/192.168.3.44:10911, msgId=C0A8032C00002A9F0000000000122185, commitLogOffset=1188229, bodyCRC=1839505431, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='DEMO_04', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1575448943433, id=d9d1ced2-bc75-3378-4c31-1c0a3691f1bc, UNIQ_KEY=C0A8032C566318B4AAC2130872110000, CLUSTER=DefaultCluster, WAIT=false, contentType=application/json, timestamp=1575448920588}, body=[123, 34, 105, 100, 34, 58, 49, 53, 55, 53, 52, 52, 56, 57, 50, 48, 125], transactionId='null'}]
java.lang.RuntimeException: 我就是故意抛出一个异常
// 此处,省略堆栈...
# Demo04Consumer 第一次重试消费失败,抛出 RuntimeException 异常。间隔了 10 秒,对应延迟级别 3 。
2019-12-05 16:42:33.509 INFO 87651 --- [MessageThread_2] c.i.s.l.r.consumer.Demo04Consumer : [onMessage][线程编号:58 消息内容:Demo04Message{id=1575448920}]
2019-12-05 16:42:33.510 WARN 87651 --- [MessageThread_2] a.r.s.s.DefaultRocketMQListenerContainer : consume message failed. messageExt:MessageExt [queueId=0, storeSize=451, queueOffset=0, sysFlag=0, bornTimestamp=1575448920596, bornHost=/192.168.3.44:62472, storeTimestamp=1575448953506, storeHost=/192.168.3.44:10911, msgId=C0A8032C00002A9F000000000012272C, commitLogOffset=1189676, bodyCRC=1839505431, reconsumeTimes=1, preparedTransactionOffset=0, toString()=Message{topic='DEMO_04', flag=0, properties={CONSUME_START_TIME=1575448953509, MIN_OFFSET=0, REAL_TOPIC=%RETRY%demo04-consumer-group-DEMO_04, ORIGIN_MESSAGE_ID=C0A8032C00002A9F0000000000122185, RETRY_TOPIC=DEMO_04, MAX_OFFSET=1, id=d9d1ced2-bc75-3378-4c31-1c0a3691f1bc, UNIQ_KEY=C0A8032C566318B4AAC2130872110000, CLUSTER=DefaultCluster, WAIT=false, contentType=application/json, DELAY=3, timestamp=1575448920588, REAL_QID=0}, body=[123, 34, 105, 100, 34, 58, 49, 53, 55, 53, 52, 52, 56, 57, 50, 48, 125], transactionId='null'}]
java.lang.RuntimeException: 我就是故意抛出一个异常
// 此处,省略堆栈...
# Demo04Consumer 第二次重试消费失败,抛出 RuntimeException 异常。间隔了 30 秒,对应延迟级别 4 。
2019-12-05 16:43:03.519 INFO 87651 --- [MessageThread_3] c.i.s.l.r.consumer.Demo04Consumer : [onMessage][线程编号:59 消息内容:Demo04Message{id=1575448920}]
2019-12-05 16:43:03.519 WARN 87651 --- [MessageThread_3] a.r.s.s.DefaultRocketMQListenerContainer : consume message failed. messageExt:MessageExt [queueId=0, storeSize=451, queueOffset=1, sysFlag=0, bornTimestamp=1575448920596, bornHost=/192.168.3.44:62472, storeTimestamp=1575448983514, storeHost=/192.168.3.44:10911, msgId=C0A8032C00002A9F0000000000122AA1, commitLogOffset=1190561, bodyCRC=1839505431, reconsumeTimes=2, preparedTransactionOffset=0, toString()=Message{topic='DEMO_04', flag=0, properties={CONSUME_START_TIME=1575448983519, MIN_OFFSET=0, REAL_TOPIC=%RETRY%demo04-consumer-group-DEMO_04, ORIGIN_MESSAGE_ID=C0A8032C00002A9F0000000000122185, RETRY_TOPIC=DEMO_04, MAX_OFFSET=2, id=d9d1ced2-bc75-3378-4c31-1c0a3691f1bc, UNIQ_KEY=C0A8032C566318B4AAC2130872110000, CLUSTER=DefaultCluster, WAIT=false, contentType=application/json, DELAY=4, timestamp=1575448920588, REAL_QID=0}, body=[123, 34, 105, 100, 34, 58, 49, 53, 55, 53, 52, 52, 56, 57, 50, 48, 125], transactionId='null'}]
java.lang.RuntimeException: 我就是故意抛出一个异常
// 此处,省略堆栈...
- 从日志中,我们可以看到,消息因为消费失败后,又重试消费了多次。
7. 广播消费
示例代码对应仓库:lab-31-rocketmq-demo 。
在上述的示例中,我们看到的都是使用集群消费。而在一些场景下,我们需要使用广播消费。
广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。
例如说,在应用中,缓存了数据字典等配置表在内存中,可以通过 RocketMQ 广播消费,实现每个应用节点都消费消息,刷新本地内存的缓存。
又例如说,我们基于 WebSocket 实现了 IM 聊天,在我们给用户主动发送消息时,因为我们不知道用户连接的是哪个提供 WebSocket 的应用,所以可以通过 RocketMQ 广播消费,每个应用判断当前用户是否是和自己提供的 WebSocket 服务连接,如果是,则推送消息给用户。
下面,我们开始本小节的示例。
7.1 Demo05Message
在 cn.iocoder.springboot.lab31.rocketmqdemo.message
包下,创建 Demo05Message 消息类,提供给当前示例使用。代码如下:
public class Demo05Message {
public static final String TOPIC = "DEMO_05";
/**
* 编号
*/
private Integer id;
// ... 省略 set/get/toString 方法
}
TOPIC
静态属性,我们设置该消息类对应 Topic 为"DEMO_05"
。
7.2 Demo05Producer
在 cn.iocoder.springboot.lab31.rocketmqdemo.producer
包下,创建 Demo04Producer 类,它会使用 RocketMQ-Spring 封装提供的 RocketMQTemplate ,实现同步发送消息。代码如下:
// Demo05Producer.java
@Component
public class Demo05Producer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult syncSend(Integer id) {
// 创建 Demo05Message 消息
Demo05Message message = new Demo05Message();
message.setId(id);
// 同步发送消息
return rocketMQTemplate.syncSend(Demo05Message.TOPIC, message);
}
}
- 代码上,并没有什么特别,和 「3.6 Demo01Producer」 的同步发送消息的代码是一致的,除了消息换成了 Demo05Message 。
7.3 Demo05Consumer
在 cn.iocoder.springboot.lab31.rocketmqdemo.consumer
包下,创建 Demo05Consumer 类,实现 Rocket-Spring 定义的 RocketMQListener 接口,消费消息。代码如下:
// Demo05Consumer.java
@Component
@RocketMQMessageListener(
topic = Demo05Message.TOPIC,
consumerGroup = "demo05-consumer-group-" + Demo05Message.TOPIC,
messageModel = MessageModel.BROADCASTING // 设置为广播消费
)
public class Demo05Consumer implements RocketMQListener<Demo05Message> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(Demo05Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
- 差异点,主要是
@RocketMQMessageListener
注解,通过设置了messageModel = MessageModel.BROADCASTING
,表示使用广播消费。
7.4 简单测试
创建 Demo05ProducerTest 测试类,用于测试广播消费。代码如下:
// Demo05ProducerTest.java
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo05ProducerTest {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private Demo05Producer producer;
@Test
public void test() throws InterruptedException {
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
@Test
public void testSyncSend() throws InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
SendResult result = producer.syncSend(id);
logger.info("[testSyncSend][发送编号:[{}] 发送结果:[{}]]", id, result);
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
}
- 首先,执行
#test()
测试方法,先启动一个消费者分组"demo05-consumer-group-DEMO_05"
的 Consumer 节点。 - 然后,执行
#testSyncSend()
测试方法,先启动一个消费者分组"demo05-consumer-group-DEMO_05"
的 Consumer 节点。同时,该测试方法,调用Demo05ProducerTest#syncSend(id)
方法,同步发送了一条消息。控制台输出如下:
// #### testSyncSend 方法对应的控制台 ####
# Producer 同步发送消息成功
2019-12-05 17:26:00.439 INFO 89499 --- [ main] c.i.s.l.r.producer.Demo05ProducerTest : [testSyncSend][发送编号:[1575451560] 发送结果:[SendResult [sendStatus=SEND_OK, msgId=C0A8032C5D9B18B4AAC21330B9F00000, offsetMsgId=C0A8032C00002A9F0000000000124421, messageQueue=MessageQueue [topic=DEMO_05, brokerName=broker-a, queueId=0], queueOffset=1]]]
# Demo05Consumer 消费了该消息
2019-12-05 17:26:03.271 INFO 89499 --- [MessageThread_1] c.i.s.l.r.consumer.Demo05Consumer : [onMessage][线程编号:63 消息内容:Demo05Message{id=1575451560}]
// ### test 方法对应的控制台 ####
# Demo05Consumer 也消费了该消息
2019-12-05 17:26:00.440 INFO 89490 --- [MessageThread_1] c.i.s.l.r.consumer.Demo05Consumer : [onMessage][线程编号:70 消息内容:Demo05Message{id=1575451560}]
- 消费者分组
"demo05-consumer-group-DEMO_05"
的两个 Consumer 节点,都消费了这条发送的消息。符合广播消费的预期~
8. 顺序消息
示例代码对应仓库:lab-31-rocketmq-demo 。
RocketMQ 提供了两种顺序级别:
- 普通顺序消息 :Producer 将相关联的消息发送到相同的消息队列。
- 完全严格顺序 :在【普通顺序消息】的基础上,Consumer 严格顺序消费。
目前已知的应用只有数据库 binlog 同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。
如下是 RocketMQ 官方文档对这两种顺序级别的定义:
- 普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。
- 严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
下面,我们开始本小节的示例。
8.1 Demo06Message
在 cn.iocoder.springboot.lab31.rocketmqdemo.message
包下,创建 Demo06Message 消息类,提供给当前示例使用。代码如下:
public class Demo06Message {
public static final String TOPIC = "DEMO_06";
/**
* 编号
*/
private Integer id;
// ... 省略 set/get/toString 方法
}
TOPIC
静态属性,我们设置该消息类对应 Topic 为"DEMO_06"
。
8.2 Demo06Producer
在 cn.iocoder.springboot.lab31.rocketmqdemo.producer
包下,创建 Demo06Producer 类,它会使用 RocketMQ-Spring 封装提供的 RocketMQTemplate ,实现三种发送顺序消息的方式。代码如下:
// Demo06Producer.java
@Component
public class Demo06Producer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult syncSendOrderly(Integer id) {
// 创建 Demo06Message 消息
Demo06Message message = new Demo06Message();
message.setId(id);
// 同步发送消息
return rocketMQTemplate.syncSendOrderly(Demo06Message.TOPIC, message, String.valueOf(id));
}
public void asyncSendOrderly(Integer id, SendCallback callback) {
// 创建 Demo06Message 消息
Demo06Message message = new Demo06Message();
message.setId(id);
// 异步发送消息
rocketMQTemplate.asyncSendOrderly(Demo06Message.TOPIC, message, String.valueOf(id), callback);
}
public void onewaySendOrderly(Integer id) {
// 创建 Demo06Message 消息
Demo06Message message = new Demo06Message();
message.setId(id);
// 异步发送消息
rocketMQTemplate.sendOneWayOrderly(Demo06Message.TOPIC, message, String.valueOf(id));
}
}
- 相比 「3.6 Demo01Producer」 来说,调用了对应的 Orderly 方法,从而实现发送顺序消息。
- 同时,需要传入方法参数
hashKey
,作为选择消息队列的键。
@param hashKey use this key to select queue. for example: orderId, productId ...
一般情况下,可以使用订单号、商品号、用户编号。
RocketMQTemplate 在发送顺序消息时,默认采用 SelectMessageQueueByHash 策略。如此,相同的 hashKey
的消息,就可以发送到相同的 Topic 的对应队列中。这种形式,就是我们上文提到的普通顺序消息的方式。
8.3 Demo06Consumer
在 cn.iocoder.springboot.lab31.rocketmqdemo.consumer
包下,创建 Demo06Consumer 类,实现 Rocket-Spring 定义的 RocketMQListener 接口,消费消息。代码如下:
在 RocketMQ 中,Producer 可以根据定义 MessageQueueSelector 消息队列选择策略,选择 Topic 下的队列。目前提供三种策略:
SelectMessageQueueByHash ,基于 hashKey
的哈希值取余,选择对应的队列。 SelectMessageQueueByRandom ,基于随机的策略,选择队列。 SelectMessageQueueByMachineRoom ,😈 有点看不懂,目前是空的实现,暂时无视吧。 未使用 MessageQueueSelector 时,采用轮询的策略,选择队列。
// Demo06Consumer.java
@Component
@RocketMQMessageListener(
topic = Demo06Message.TOPIC,
consumerGroup = "demo06-consumer-group-" + Demo06Message.TOPIC,
consumeMode = ConsumeMode.ORDERLY // 设置为顺序消费
)
public class Demo06Consumer implements RocketMQListener<Demo06Message> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(Demo06Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
// sleep 2 秒,用于查看顺序消费的效果
try {
Thread.sleep(2 * 1000L);
} catch (InterruptedException ignore) {
}
}
}
- 差异点,主要是
@RocketMQMessageListener
注解,通过设置了consumeMode = ConsumeMode.ORDERLY
,表示使用顺序消费。
8.4 简单测试
创建 Demo06ProducerTest 测试类,编写三个单元测试方法,调用 Demo06Producer 三种发送顺序消息的方式。代码如下:
// Demo06ProducerTest.java
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo06ProducerTest {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private Demo06Producer producer;
@Test
public void testSyncSendOrderly() throws InterruptedException {
// 发送多条消息
for (int i = 0; i < 3; i++) {
int id = 1024; // 固定成 1024 ,方便我们测试是否发送到相同消息队列
SendResult result = producer.syncSendOrderly(id);
logger.info("[testSyncSendOrderly][发送编号:[{}] 发送结果:[{}]]", id, result);
}
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
@Test
public void testASyncSendOrderly() throws InterruptedException {
for (int i = 0; i < 3; i++) {
int id = 1024; // 固定成 1024 ,方便我们测试是否发送到相同消息队列
producer.asyncSendOrderly(id, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
logger.info("[testASyncSendOrderly][发送编号:[{}] 发送成功,结果为:[{}]]", id, result);
}
@Override
public void onException(Throwable e) {
logger.info("[testASyncSendOrderly][发送编号:[{}] 发送异常]]", id, e);
}
});
}
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
@Test
public void testOnewaySendOrderly() throws InterruptedException {
for (int i = 0; i < 3; i++) {
int id = 1024; // 固定成 1024 ,方便我们测试是否发送到相同消息队列
producer.onewaySendOrderly(id);
logger.info("[testOnewaySendOrderly][发送编号:[{}] 发送完成]", id);
}
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
}
我们来执行 #testSyncSendOrderly()
方法,测试同步发送顺序消息。控制台输出如下:
# Producer 同步发送 3 条顺序消息成功,都发送到了 Topic 为 DEMO_06 ,队列编号为 1 的消息队列上
2019-12-05 21:04:58.887 INFO 94854 --- [ main] c.i.s.l.r.producer.Demo06ProducerTest : [testSyncSendOrderly][发送编号:[1024] 发送结果:[SendResult [sendStatus=SEND_OK, msgId=C0A8032C728618B4AAC213F934030002, offsetMsgId=C0A8032C00002A9F000000000012D46A, messageQueue=MessageQueue [topic=DEMO_06, brokerName=broker-a, queueId=1], queueOffset=0]]]
2019-12-05 21:04:58.889 INFO 94854 --- [ main] c.i.s.l.r.producer.Demo06ProducerTest : [testSyncSendOrderly][发送编号:[1024] 发送结果:[SendResult [sendStatus=SEND_OK, msgId=C0A8032C728618B4AAC213F934080004, offsetMsgId=C0A8032C00002A9F000000000012D580, messageQueue=MessageQueue [topic=DEMO_06, brokerName=broker-a, queueId=1], queueOffset=1]]]
2019-12-05 21:04:58.891 INFO 94854 --- [ main] c.i.s.l.r.producer.Demo06ProducerTest : [testSyncSendOrderly][发送编号:[1024] 发送结果:[SendResult [sendStatus=SEND_OK, msgId=C0A8032C728618B4AAC213F9340A0006, offsetMsgId=C0A8032C00002A9F000000000012D696, messageQueue=MessageQueue [topic=DEMO_06, brokerName=broker-a, queueId=1], queueOffset=2]]]
# 第一条消息的消费
2019-12-05 21:05:01.647 INFO 94854 --- [MessageThread_1] c.i.s.l.r.consumer.Demo06Consumer : [onMessage][线程编号:69 消息内容:Demo06Message{id=1024}]
2019-12-05 21:05:03.651 INFO 94854 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A8032C728618B4AAC213F934030002 cost: 2005 ms
# 第二条消息的消费
2019-12-05 21:05:03.653 INFO 94854 --- [MessageThread_1] c.i.s.l.r.consumer.Demo06Consumer : [onMessage][线程编号:69 消息内容:Demo06Message{id=1024}]
2019-12-05 21:05:05.654 INFO 94854 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A8032C728618B4AAC213F934080004 cost: 2002 ms
# 第三条消息的消费
2019-12-05 21:05:05.654 INFO 94854 --- [MessageThread_1] c.i.s.l.r.consumer.Demo06Consumer : [onMessage][线程编号:69 消息内容:Demo06Message{id=1024}]
2019-12-05 21:05:07.657 INFO 94854 --- [MessageThread_1] a.r.s.s.DefaultRocketMQListenerContainer : consume C0A8032C728618B4AAC213F9340A0006 cost: 2003 ms
- Producer 发送顺序消息时,因为我们使用
id = 1
作为hashKey
,所以都发送到了 Topic 为"DEMO_06"
,队列编号为 1 的消息队列。 - Consumer 顺序 消费消息时,是在单线程中,顺序消费每条消息。
9. 事务消息
示例代码对应仓库:lab-31-rocketmq-demo 。
在分布式消息队列中,目前唯一提供完整的事务消息的,只有 RocketMQ 。关于这一点,还是可以鼓吹下的。
可能会有胖友怒喷艿艿,RabbitMQ 和 Kafka 也有事务消息啊,也支持发送事务消息的发送,以及后续的事务消息的 commit提交或 rollbackc 回滚。但是要考虑一个极端的情况,在本地数据库事务已经提交的时时候,如果因为网络原因,又或者崩溃等等意外,导致事务消息没有被 commit ,最终导致这条事务消息丢失,分布式事务出现问题。
相比来说,RocketMQ 提供事务回查机制,如果应用超过一定时长未 commit 或 rollback 这条事务消息,RocketMQ 会主动回查应用,询问这条事务消息是 commit 还是 rollback ,从而实现事务消息的状态最终能够被 commit 或是 rollback ,达到最终事务的一致性。
这也是为什么艿艿在上面专门加粗"完整的"三个字的原因。可能上述的描述,对于绝大多数没有了解过分布式事务的胖友,会比较陌生,所以推荐阅读如下两篇文章:
热心的艿艿:虽然说 RabbitMQ、Kafka 并未提供完整的事务消息,但是社区里,已经基于它们之上拓展,提供了事务回查的功能。例如说:Myth ,采用消息队列解决分布式事务的开源框架, 基于 Java 语言来开发(JDK1.8),支持 Dubbo,Spring Cloud,Motan 等 RPC 框架进行分布式事务。
下面,我们开始本小节的示例。
9.1 Demo07Message
在 cn.iocoder.springboot.lab31.rocketmqdemo.message
包下,创建 Demo07Message 消息类,提供给当前示例使用。代码如下:
public class Demo07Message {
public static final String TOPIC = "DEMO_07";
/**
* 编号
*/
private Integer id;
// ... 省略 set/get/toString 方法
}
TOPIC
静态属性,我们设置该消息类对应 Topic 为"DEMO_07"
。
9.2 Demo07Producer
在 cn.iocoder.springboot.lab31.rocketmqdemo.producer
包下,创建 Demo07Producer 类,它会使用 RocketMQ-Spring 封装提供的 RocketMQTemplate ,实现发送事务消息。代码如下:
// Demo07Producer.java
@Component
public class Demo07Producer {
private static final String TX_PRODUCER_GROUP = "demo07-producer-group";
@Autowired
private RocketMQTemplate rocketMQTemplate;
public TransactionSendResult sendMessageInTransaction(Integer id) {
// <1> 创建 Demo07Message 消息
Message message = MessageBuilder.withPayload(new Demo07Message().setId(id))
.build();
// <2> 发送事务消息
return rocketMQTemplate.sendMessageInTransaction(TX_PRODUCER_GROUP, Demo07Message.TOPIC, message,
id);
}
}
<1>
处,创建内容为 Demo07Message 的 Spring Messaging Message 消息。<2>
处,调用RocketMQTemplate#sendMessageInTransaction(...)
方法,发送事务消息。我们来看看该方法的方法参数,代码如下:
// RocketMQTemplate.java
/**
* Send Spring Message in Transaction
*
* @param txProducerGroup the validate txProducerGroup name, set null if using the default name
* @param destination destination formats: `topicName:tags`
* @param message message {@link org.springframework.messaging.Message}
* @param arg ext arg
* @return TransactionSendResult
* @throws MessagingException
*/
public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination,
final Message<?> message, final Object arg) throws MessagingException {
try {
TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup);
org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
return txProducer.sendMessageInTransaction(rocketMsg, arg);
} catch (MQClientException e) {
throw RocketMQUtil.convert(e);
}
}
- 方法参数
txProducerGroup
,事务消息的生产者分组。因为 RocketMQ 是回查(请求)指定指定生产分组下的 Producer ,从而获得事务消息的状态,所以一定要正确设置。这里,我们设置了"demo07-producer-group"
。 - 方法参数
destination
,消息的 Topic + Tag 。 - 方法参数
message
,消息,没什么特别。 - 方法参数
arg
,后续我们调用本地事务方法的时候,会传入该arg
。如果要传递多个方法参数给本地事务的方法,可以通过数组,例如说Object[]{arg1, arg2, arg3}
这样的形式。
- 方法参数
9.3 TransactionListenerImpl
在 Demo07Producer 类中,创建一个内部类 TransactionListenerImpl ,实现 MQ 事务的监听。代码如下:
// Demo07Producer.java
@RocketMQTransactionListener(txProducerGroup = TX_PRODUCER_GROUP)
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// ... local transaction process, return rollback, commit or unknown
logger.info("[executeLocalTransaction][执行本地事务,消息:{} arg:{}]", msg, arg);
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// ... check transaction status and return rollback, commit or unknown
logger.info("[checkLocalTransaction][回查消息:{}]", msg);
return RocketMQLocalTransactionState.COMMIT;
}
}
- 在类上,添加
@RocketMQTransactionListener
注解,声明监听器的是生产者分组是"demo07-producer-group"
的 Producer 发送的事务消息。 - 实现 RocketMQLocalTransactionListener 接口,实现执行本地事务和检查本地事务的方法。
- 实现
#executeLocalTransaction(...)
方法,实现执行本地事务。- 注意,这是一个模板方法 。在调用这个方法之前,RocketMQTemplate 已经使用 Producer 发送了一条事务消息。然后根据该方法执行的返回的 RocketMQLocalTransactionState 结果,提交还是回滚该事务消息。
- 这里,我们为了模拟 RocketMQ 回查 Producer 来获得事务消息的状态,所以返回了
RocketMQLocalTransactionState.UNKNOWN
未知状态。
- 实现
#checkLocalTransaction(...)
方法,检查本地事务。- 在事务消息长事件未被提交或回滚时,RocketMQ 会回查事务消息对应的生产者分组下的 Producer ,获得事务消息的状态。此时,该方法就会被调用。
- 这里,我们直接返回
RocketMQLocalTransactionState.COMMIT
提交状态。
一般来说,有两种方式实现本地事务回查时,返回事务消息的状态。
第一种 ,通过 msg
消息,获得某个业务上的标识或者编号,然后去数据库中查询业务记录,从而判断该事务消息的状态是提交还是回滚。
第二种 ,记录 msg
的事务编号,与事务状态到数据库中。
- 第一步,在
#executeLocalTransaction(...)
方法中,先存储一条id
为msg
的事务编号,状态为RocketMQLocalTransactionState.UNKNOWN
的记录。 - 第二步,调用带有事务的 业务 Service 的方法。在该 Service 方法中,在逻辑都执行成功的情况下,更新
id
为msg
的事务编号,状态变更为RocketMQLocalTransactionState.COMMIT
。这样,我们就可以伴随这个事务的提交,更新id
为msg
的事务编号的记录的状为RocketMQLocalTransactionState.COMMIT
,美滋滋。。 - 第三步,要以
try-catch
的方式,调用业务 Service 的方法。如此,如果发生异常,回滚事务的时候,可以在catch
中,更新id
为msg
的事务编号的记录的状态为RocketMQLocalTransactionState.ROLLBACK
。😭 极端情况下,可能更新失败,则打印 error 日志,告警知道,人工介入。 - 如此三步之后,我们在
#executeLocalTransaction(...)
方法中,就可以通过查找数据库,id
为msg
的事务编号的记录的状态,然后返回。
相比来说,艿艿倾向第二种,实现更加简单通用,对于业务开发者,更加友好。和有几个朋友沟通了下,他们也是采用第二种。
9.4 Demo07Consumer
在 cn.iocoder.springboot.lab31.rocketmqdemo.consumer
包下,创建 Demo03Consumer 类,实现 Rocket-Spring 定义的 RocketMQListener 接口,消费消息。代码如下:
@Component
@RocketMQMessageListener(
topic = Demo07Message.TOPIC,
consumerGroup = "demo07-consumer-group-" + Demo07Message.TOPIC
)
public class Demo07Consumer implements RocketMQListener<Demo07Message> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(Demo07Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
- 和 「3.8 Demo1Consumer」 是一样,就是消费消息。
9.5 简单测试
创建 Demo07ProducerTest 测试类,编写单元测试方法,调用 Demo07Producer 发送事务消息的方式。代码如下:
// Demo07ProducerTest.java
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo07ProducerTest {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private Demo07Producer producer;
@Test
public void testSendMessageInTransaction() throws InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
SendResult result = producer.sendMessageInTransaction(id);
logger.info("[testSendMessageInTransaction][发送编号:[{}] 发送结果:[{}]]", id, result);
// 阻塞等待,保证消费
new CountDownLatch(1).await();
}
}
我们来执行 #testSendMessageInTransaction()
方法,测试发送事务消息。控制台输出如下:
# TransactionListenerImpl 执行 executeLocalTransaction 方法,先执行本地事务的逻辑
2019-12-06 01:23:00.928 INFO 3205 --- [ main] p.Demo07Producer$TransactionListenerImpl : [executeLocalTransaction][执行本地事务,消息:GenericMessage [payload=byte[17], headers={rocketmq_TOPIC=DEMO_07, rocketmq_FLAG=0, id=ce85ed2a-d7ae-9cc6-226d-a8beb2e219ab, contentType=application/json, rocketmq_TRANSACTION_ID=0AAB01730C8518B4AAC214E570BD0002, timestamp=1575480180928}] arg:1575480180]
# Producer 发送事务消息成功,但是因为 executeLocalTransaction 方法返回的是 UNKOWN 状态,所以事务消息并未提交或者回滚
2019-12-06 01:23:00.930 INFO 3205 --- [ main] c.i.s.l.r.producer.Demo07ProducerTest : [testSendMessageInTransaction][发送编号:[1575480180] 发送结果:[SendResult [sendStatus=SEND_OK, msgId=0AAB01730C8518B4AAC214E570BD0002, offsetMsgId=null, messageQueue=MessageQueue [topic=DEMO_07, brokerName=broker-a, queueId=3], queueOffset=38]]]
# RocketMQ Broker 在发送事务消息 30 秒后,发现事务消息还未提交或是回滚,所以回查 Producer 。此时,checkLocalTransaction 方法返回 COMMIT ,所以该事务消息被提交
2019-12-06 01:23:35.155 INFO 3205 --- [pool-1-thread-1] p.Demo07Producer$TransactionListenerImpl : [checkLocalTransaction][回查消息:GenericMessage [payload=byte[17], headers={rocketmq_QUEUE_ID=3, TRANSACTION_CHECK_TIMES=1, rocketmq_BORN_TIMESTAMP=1575480180925, rocketmq_TOPIC=DEMO_07, rocketmq_FLAG=0, rocketmq_MESSAGE_ID=0AAB017300002A9F0000000000132AC3, rocketmq_TRANSACTION_ID=0AAB01730C8518B4AAC214E570BD0002, rocketmq_SYS_FLAG=0, id=0fc2f199-25fb-5911-d577-f81b8003f0f8, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=10.171.1.115, contentType=application/json, timestamp=1575480215155}]]
# 事务消息被提交,所以该消息被 Consumer 消费
2019-12-06 01:23:35.160 INFO 3205 --- [MessageThread_1] c.i.s.l.r.consumer.Demo07Consumer : [onMessage][线程编号:89 消息内容:Demo07Message{id=1575480180}]
- 整个的执行过程,看看艿艿在日志上添加的说明。
9.6 @RocketMQTransactionListener
在 「9.3 TransactionListenerImpl」 中,我们已经使用了 @RocketMQTransactionListener
注解,设置 MQ 事务监听器的信息。具体属性如下:
// RocketMQTransactionListener.java
public @interface RocketMQTransactionListener {
/**
* 事务的生产者分组
*
* Declare the txProducerGroup that is used to relate callback event to the listener, rocketMQTemplate must send a
* transactional message with the declared txProducerGroup.
* <p>
* <p>It is suggested to use the default txProducerGroup if your system only needs to define a TransactionListener class.
*/
String txProducerGroup() default RocketMQConfigUtils.ROCKETMQ_TRANSACTION_DEFAULT_GLOBAL_NAME;
/**
* Set ExecutorService params -- corePoolSize
*/
int corePoolSize() default 1;
/**
* Set ExecutorService params -- maximumPoolSize
*/
int maximumPoolSize() default 1;
/**
* Set ExecutorService params -- keepAliveTime
*/
long keepAliveTime() default 1000 * 60; //60ms
/**
* Set ExecutorService params -- blockingQueueSize
*/
int blockingQueueSize() default 2000;
/**
* The property of "access-key"
*/
String accessKey() default "${rocketmq.producer.access-key}";
/**
* The property of "secret-key"
*/
String secretKey() default "${rocketmq.producer.secret-key}";
}
10. 接入阿里云的消息队列 RocketMQ
在阿里云上,提供消息队列 RocketMQ 服务。那么,我们是否能够使用 RocketMQ-Spring 实现阿里云 RocketMQ 的消息的发送与消费呢?
答案是可以 。在 《阿里云 ------ 消息队列 MQ ------ 开源 Java SDK 接入说明》 中,提到目前开源的 Java SDK 可以接入阿里云 RocketMQ 服务。
如果您已使用开源 Java SDK 进行生产,只需参考方法,重新配置参数,即可实现无缝上云。
前提条件
- 已在阿里云 MQ 控制台创建资源,包括 Topic、Group ID(GID)、接入点(Endpoint),以及 AccessKeyId 和 AccessKeySecret。
- 已下载开源 RocketMQ 4.5.1 或以上版本,以支持连接阿里云 MQ。
这里,艿艿创建了 lab-31-rocketmq-ons 示例项目,使用 RocketMQ-Spring 接入阿里云。重点的差异,就在 application.yaml
配置文件,配置如下:
# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
name-server: http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80 # 阿里云 RocketMQ Namesrv
access-channel: CLOUD # 设置使用阿里云
# Producer 配置项
producer:
group: GID_PRODUCER_GROUP_YUNAI_TEST # 生产者分组
access-key: # 设置阿里云的 RocketMQ 的 access key !!!这里涉及到隐私,所以这里艿艿没有提供
secret-key: # 设置阿里云的 RocketMQ 的 secret key !!!这里涉及到隐私,所以这里艿艿没有提供
- 重点,就是设置了
rocketmq.access-channel=CLOUD
,访问阿里云 RocketMQ 服务。
从个人使用感受上来说,RocketMQ 提供的特性,可能是最为丰富的,可以说是最适合业务团队的分布式消息队列。艿艿是从 2013 年开始用 RocketMQ 的,主要踩的坑,都是自己错误使用导致的。例如说:
- 刚开始略微抠门,只搭建了 RocketMQ 一主一从集群,结果恰好倒霉,不小心挂了主。
- 多个 Topic 公用一个消费者集群,导致使用相同线程池。结果,嘿~有个消费逻辑需要调用第三方服务,某一天突然特别慢,导致消费积压,进而整个线程池堵塞。
- 相同消费者分组,订阅了不同的 Topic ,导致相互覆盖。
如果胖友在使用阿里云的话,建议量级较小的情况下,可以考虑先使用 阿里云 ------ 消息队列 MQ 服务 。毕竟搭建一个高可用的 RocketMQ 量主两从的集群,最最最起码要两个 ECS 节点。同时,需要一定的维护和监控成本。😈 我们目前有个项目,就是直接使用阿里云的消息队列服务。
消息队列是非常重要的组件,推荐阅读下 RocketMQ 的最佳实践:
另外,如下官方文档,建议通读 + 通读 + 通断:
- 《RocketMQ 用户指南》 基于 RocketMQ 3 的版本。
- 《RocketMQ 原理简介》 基于 RocketMQ 3 的版本。
- 《RocketMQ 最佳实践》 基于 RocketMQ 3 的版本。
- 《RocketMQ 开发者指南》 基于 RocketMQ 4 的版本。
- 《阿里云 ------ 消息队列 MQ》 阿里云的消息队列,就是 RocketMQ 的云服务。
这里,在额外推荐一些内容:
- 《RocketMQ 入门 ------ 原理与实践》 ,一文快速了解 RocketMQ 的原理与实践,非常不错,篇幅也在可接受的范围之内。
- 《性能测试 ------ RocketMQ 基准测试》 ,消息消息队列是我们非常重要的性能优化手段,那么到底它的性能有多强,何不上手测试一波~
- 《RocketMQ 源码解析系列》 ,知其然,知其所以然。RocketMQ 是艿艿第一个特别完全看完的开源中间件,收获颇丰。
来源:https://blog.csdn.net/weixin_42073629/article/details/106225408