Appearance
RocketMQ消息服务
RocketMQ 是阿里巴巴开源的分布式消息中间件,是一款低延迟、高可靠、可伸缩、易于使用的消息队列,被广泛应用于异步通信、削峰填谷、解耦等场景。Spring Cloud Alibaba 提供了 RocketMQ 的集成,使其能够轻松地与 Spring Cloud 应用集成。
RocketMQ 基本概念
在深入了解 Spring Cloud Alibaba RocketMQ 之前,首先需要理解 RocketMQ 的几个核心概念:
1. 消息模型
- Topic:消息主题,消息的逻辑分类,一个 Topic 可以有多个 Producer 和 Consumer
- Message:消息,消息队列中传输的数据
- Tag:消息标签,用于同一主题下消息的过滤
- Group:消费者组,消费同一类消息的消费者集合
2. 部署架构
- Name Server:注册中心,存储 Topic 和 Broker 的路由信息
- Broker:消息服务器,负责消息的存储、投递和查询
- Producer:消息生产者,负责产生消息,一般由业务系统负责产生
- Consumer:消息消费者,负责消费消息,一般是后台系统
3. 消息类型
- 普通消息:单向消息、同步消息、异步消息
- 顺序消息:保证消息的顺序性
- 事务消息:确保生产者本地事务与发送消息的原子性
- 延时消息:消息发送后,不会立即投递,而是在指定时间后才投递到消费者
- 批量消息:一次性发送多条消息
安装与启动 RocketMQ
方式一:下载二进制包
从 RocketMQ 官方网站 下载二进制包
解压后配置环境变量:
bashexport ROCKETMQ_HOME=/path/to/rocketmq
启动 Name Server:
bashnohup sh bin/mqnamesrv &
启动 Broker:
bashnohup sh bin/mqbroker -n localhost:9876 &
方式二:使用 Docker
使用 Docker Compose 启动 RocketMQ:
yaml
version: '3'
services:
namesrv:
image: apache/rocketmq-namesrv:latest
container_name: rocketmq-namesrv
ports:
- 9876:9876
volumes:
- ./data/namesrv/logs:/home/rocketmq/logs
environment:
- MAX_HEAP_SIZE=256M
- HEAP_NEWSIZE=128M
broker:
image: apache/rocketmq-broker:latest
container_name: rocketmq-broker
ports:
- 10909:10909
- 10911:10911
volumes:
- ./data/broker/logs:/home/rocketmq/logs
- ./data/broker/store:/home/rocketmq/store
- ./data/broker/conf/broker.conf:/home/rocketmq/conf/broker.conf
environment:
- NAMESRV_ADDR=namesrv:9876
- MAX_HEAP_SIZE=512M
- HEAP_NEWSIZE=256M
depends_on:
- namesrv
dashboard:
image: apacherocketmq/rocketmq-dashboard:latest
container_name: rocketmq-dashboard
ports:
- 8080:8080
environment:
- JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876
depends_on:
- namesrv
创建 broker.conf
配置文件:
properties
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
集成 Spring Cloud Alibaba RocketMQ
1. 添加依赖
xml
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
2. 配置 RocketMQ
在 application.yml
中添加 RocketMQ 配置:
yaml
spring:
cloud:
stream:
function:
definition: consumer;supply
bindings:
consumer-in-0:
destination: test-topic
group: test-group
supply-out-0:
destination: test-topic
rocketmq:
binder:
name-server: 127.0.0.1:9876 # RocketMQ Name Server 地址
bindings:
consumer-in-0:
consumer:
orderly: true # 顺序消费
broadcasting: false # 是否广播模式
supply-out-0:
producer:
group: test-producer-group # 生产者组
sync: true # 是否同步发送
3. 生产者示例
使用 Spring Cloud Stream 的函数式编程模型发送消息:
java
@Component
public class MessageProducer {
private final StreamBridge streamBridge;
public MessageProducer(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
// 直接使用 StreamBridge 发送消息
public boolean sendMessage(String message) {
return streamBridge.send("supply-out-0", message);
}
// 使用 Supplier 接口发送消息
@Bean
public Supplier<Flux<String>> supply() {
return () -> Flux.interval(Duration.ofSeconds(1))
.map(l -> "Hello World " + l);
}
}
4. 消费者示例
使用函数式编程模型消费消息:
java
@Component
public class MessageConsumer {
private static final Logger log = LoggerFactory.getLogger(MessageConsumer.class);
@Bean
public Consumer<String> consumer() {
return message -> log.info("Received message: {}", message);
}
}
5. 发送各种类型的消息
普通消息
java
@Service
public class MessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 同步发送消息
public SendResult syncSend(String topic, String message) {
return rocketMQTemplate.syncSend(topic, message);
}
// 异步发送消息
public void asyncSend(String topic, String message) {
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("Async send message success: {}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("Async send message failed", throwable);
}
});
}
// 单向发送消息(不关心结果)
public void sendOneWay(String topic, String message) {
rocketMQTemplate.sendOneWay(topic, message);
}
}
顺序消息
顺序消息是指消息的消费顺序与发送顺序保持一致。RocketMQ 提供了"分区顺序消息",即保证同一分区内的消息顺序,而不同分区之间的消息无序。
java
@Service
public class OrderlyMessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 发送顺序消息(使用哈希选择队列)
public SendResult sendOrderlyMessage(String topic, String message, String hashKey) {
return rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
}
}
配置消费者顺序消费:
yaml
spring:
cloud:
stream:
rocketmq:
bindings:
consumer-in-0:
consumer:
orderly: true # 顺序消费
延时消息
RocketMQ 支持发送延时消息,目前支持以下级别的延时:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h。
java
@Service
public class DelayedMessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 发送延时消息
public SendResult sendDelayedMessage(String topic, String message, int delayLevel) {
Message<String> msg = MessageBuilder.withPayload(message).build();
return rocketMQTemplate.syncSend(topic, msg, 3000, delayLevel);
}
}
事务消息
事务消息可以确保本地事务和消息发送的原子性,常用于分布式事务场景。
java
@Service
public class TransactionMessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 发送事务消息
public void sendTransactionMessage(String topic, String message, Object arg) {
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(topic, MessageBuilder.withPayload(message).build(), arg);
log.info("Transaction message send result: {}", result);
}
}
@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
try {
// 业务逻辑...
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 检查本地事务状态
// 如果消息未收到确认,MQ会定期调用此方法,确认事务状态
// 1. 检查本地事务执行结果
// 2. 根据结果返回事务状态
return RocketMQLocalTransactionState.COMMIT;
}
}
批量消息
批量消息可以提高传输效率,一次性发送多条消息。
java
@Service
public class BatchMessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 发送批量消息
public SendResult sendBatchMessages(String topic, List<String> messages) {
List<Message<String>> messageList = messages.stream()
.map(msg -> MessageBuilder.withPayload(msg).build())
.collect(Collectors.toList());
return rocketMQTemplate.syncSend(topic, messageList);
}
}
高级特性与配置
1. 消息过滤
RocketMQ 支持基于 Tag 和 SQL 表达式的消息过滤。
Tag 过滤配置:
yaml
spring:
cloud:
stream:
rocketmq:
bindings:
consumer-in-0:
consumer:
tags: tagA || tagB # 接收 tagA 或 tagB 的消息
发送带 Tag 的消息:
java
public void sendMessageWithTag(String topic, String tag, String message) {
rocketMQTemplate.syncSend(topic + ":" + tag, message);
}
SQL 过滤配置:
yaml
spring:
cloud:
stream:
rocketmq:
bindings:
consumer-in-0:
consumer:
sql: "a between 0 and 3" # SQL 过滤表达式
发送带属性的消息:
java
public void sendMessageWithProperty(String topic, String message) {
Message<String> msg = MessageBuilder
.withPayload(message)
.setHeader("a", 1)
.build();
rocketMQTemplate.syncSend(topic, msg);
}
2. 消息重试
当消息消费失败时,RocketMQ 支持消息重试机制。
yaml
spring:
cloud:
stream:
rocketmq:
bindings:
consumer-in-0:
consumer:
delayLevelWhenNextConsume: 1 # 重试等级
maxReconsumeTimes: 3 # 最大重试次数
3. 并发消费
RocketMQ 支持并发消费消息,提高消费效率:
yaml
spring:
cloud:
stream:
rocketmq:
bindings:
consumer-in-0:
consumer:
orderly: false # 非顺序消费(并发消费)
consumeThreadMin: 20 # 最小消费线程数
consumeThreadMax: 64 # 最大消费线程数
4. 集群消费与广播消费
RocketMQ 支持集群消费(默认模式)和广播消费两种模式:
- 集群消费:同一个消费组中的消费者平均分摊消息
- 广播消费:同一个消费组中的每个消费者都接收所有消息
配置广播消费:
yaml
spring:
cloud:
stream:
rocketmq:
bindings:
consumer-in-0:
consumer:
broadcasting: true # 广播消费
5. 消息持久化
RocketMQ 默认提供了高可靠的消息持久化机制,可以通过 Broker 配置文件调整持久化策略:
properties
# 同步刷盘
flushDiskType = SYNC_FLUSH
# 异步刷盘(默认)
flushDiskType = ASYNC_FLUSH
最佳实践
1. 生产者最佳实践
- 合理设置消息发送超时时间,避免消息发送过慢影响业务
- 使用异步发送提高发送性能,但要注意处理异步回调
- 批量发送合理使用,以提高传输效率
- 避免发送大消息,消息体应控制在 4MB 以内
- 为消息设置 Tag,便于消费者过滤
- 设置消息重试策略,防止消息发送失败
2. 消费者最佳实践
- 保证消费幂等性,防止消息重复消费导致业务错误
- 合理设置消费线程数,根据业务特点和机器配置调整
- 合理选择消费模式,顺序敏感场景使用顺序消费,高吞吐场景使用并发消费
- 处理消费异常,避免因消费异常导致消息积压
- 及时提交消费位点,防止重复消费
- 设置合理的重试策略,避免无效重试消耗资源
3. 运维最佳实践
- 合理规划硬件资源,特别是磁盘和内存
- 定期清理过期消息,防止磁盘占用过大
- 监控消息堆积,及时发现消费异常
- 备份重要消息,防止数据丢失
- 使用集群部署,提高可用性
监控与运维
1. RocketMQ Dashboard
RocketMQ 提供了 Dashboard 工具,用于监控和管理 RocketMQ 集群,包括以下功能:
- Topic 管理
- 消费者管理
- 消息查询
- 集群监控
- 运行状态查看
2. 整合 Prometheus 和 Grafana
RocketMQ 支持通过 JMX 暴露监控指标,可以整合 Prometheus 和 Grafana 构建强大的监控系统。
常见问题与解决方案
1. 消息丢失问题
可能原因:
- 生产者发送失败
- Broker 宕机,消息未持久化
- 消费者消费后,未正确提交消费位点
解决方案:
- 使用同步发送或异步发送时处理发送结果
- 配置 Broker 主从架构,启用同步刷盘
- 确保消费成功后再提交消费位点
2. 消息重复消费
可能原因:
- 生产者重试发送
- 消费者处理成功,但提交消费位点失败
解决方案:
- 业务层实现幂等性处理,如唯一索引、去重表、状态检查等
- 使用分布式锁避免重复处理
3. 消息积压
可能原因:
- 消费速度慢于生产速度
- 消费者处理消息异常
- 消费者数量不足
解决方案:
- 增加消费者数量或实例
- 优化消费逻辑,提高处理效率
- 增加消费线程数
- 临时扩容,消费积压消息
4. 顺序消息错乱
可能原因:
- 消息发送时未使用相同的分区键
- 消费者配置不正确,未启用顺序消费
解决方案:
- 确保相关的消息使用相同的分区键发送
- 配置消费者为顺序消费模式
总结
RocketMQ 作为 Spring Cloud Alibaba 的重要组件,为微服务架构提供了强大的消息服务能力。它具有高性能、高可靠性、丰富的功能特性,适用于各种复杂的业务场景。通过与 Spring Cloud Stream 的无缝集成,开发者可以使用统一的编程模型来处理消息,大大简化了消息队列的使用成本。
在实际应用中,选择合适的消息类型、合理配置生产者和消费者参数、实现幂等消费、做好监控运维等最佳实践,可以充分发挥 RocketMQ 的优势,构建高可靠、高性能的分布式系统。