Skip to content

RocketMQ消息服务

RocketMQ 是阿里巴巴开源的分布式消息中间件,是一款低延迟、高可靠、可伸缩、易于使用的消息队列,被广泛应用于异步通信、削峰填谷、解耦等场景。Spring Cloud Alibaba 提供了 RocketMQ 的集成,使其能够轻松地与 Spring Cloud 应用集成。

RocketMQ 基本概念

在深入了解 Spring Cloud Alibaba RocketMQ 之前,首先需要理解 RocketMQ 的几个核心概念:

1. 消息模型

  • Topic:消息主题,消息的逻辑分类,一个 Topic 可以有多个 Producer 和 Consumer
  • Message:消息,消息队列中传输的数据
  • Tag:消息标签,用于同一主题下消息的过滤
  • Group:消费者组,消费同一类消息的消费者集合

2. 部署架构

  • Name Server:注册中心,存储 Topic 和 Broker 的路由信息
  • Broker:消息服务器,负责消息的存储、投递和查询
  • Producer:消息生产者,负责产生消息,一般由业务系统负责产生
  • Consumer:消息消费者,负责消费消息,一般是后台系统

3. 消息类型

  • 普通消息:单向消息、同步消息、异步消息
  • 顺序消息:保证消息的顺序性
  • 事务消息:确保生产者本地事务与发送消息的原子性
  • 延时消息:消息发送后,不会立即投递,而是在指定时间后才投递到消费者
  • 批量消息:一次性发送多条消息

安装与启动 RocketMQ

方式一:下载二进制包

  1. RocketMQ 官方网站 下载二进制包

  2. 解压后配置环境变量:

    bash
    export ROCKETMQ_HOME=/path/to/rocketmq
  3. 启动 Name Server:

    bash
    nohup sh bin/mqnamesrv &
  4. 启动 Broker:

    bash
    nohup sh bin/mqbroker -n localhost:9876 &

方式二:使用 Docker

使用 Docker Compose 启动 RocketMQ:

yaml
version: '3'
services:
  namesrv:
    image: apache/rocketmq-namesrv:latest
    container_name: rocketmq-namesrv
    ports:
      - 9876:9876
    volumes:
      - ./data/namesrv/logs:/home/rocketmq/logs
    environment:
      - MAX_HEAP_SIZE=256M
      - HEAP_NEWSIZE=128M

  broker:
    image: apache/rocketmq-broker:latest
    container_name: rocketmq-broker
    ports:
      - 10909:10909
      - 10911:10911
    volumes:
      - ./data/broker/logs:/home/rocketmq/logs
      - ./data/broker/store:/home/rocketmq/store
      - ./data/broker/conf/broker.conf:/home/rocketmq/conf/broker.conf
    environment:
      - NAMESRV_ADDR=namesrv:9876
      - MAX_HEAP_SIZE=512M
      - HEAP_NEWSIZE=256M
    depends_on:
      - namesrv

  dashboard:
    image: apacherocketmq/rocketmq-dashboard:latest
    container_name: rocketmq-dashboard
    ports:
      - 8080:8080
    environment:
      - JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876
    depends_on:
      - namesrv

创建 broker.conf 配置文件:

properties
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

集成 Spring Cloud Alibaba RocketMQ

1. 添加依赖

xml
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

2. 配置 RocketMQ

application.yml 中添加 RocketMQ 配置:

yaml
spring:
  cloud:
    stream:
      function:
        definition: consumer;supply
      bindings:
        consumer-in-0:
          destination: test-topic
          group: test-group
        supply-out-0:
          destination: test-topic
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876  # RocketMQ Name Server 地址
        bindings:
          consumer-in-0:
            consumer:
              orderly: true  # 顺序消费
              broadcasting: false  # 是否广播模式
          supply-out-0:
            producer:
              group: test-producer-group  # 生产者组
              sync: true  # 是否同步发送

3. 生产者示例

使用 Spring Cloud Stream 的函数式编程模型发送消息:

java
@Component
public class MessageProducer {
    
    private final StreamBridge streamBridge;
    
    public MessageProducer(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }
    
    // 直接使用 StreamBridge 发送消息
    public boolean sendMessage(String message) {
        return streamBridge.send("supply-out-0", message);
    }
    
    // 使用 Supplier 接口发送消息
    @Bean
    public Supplier<Flux<String>> supply() {
        return () -> Flux.interval(Duration.ofSeconds(1))
                        .map(l -> "Hello World " + l);
    }
}

4. 消费者示例

使用函数式编程模型消费消息:

java
@Component
public class MessageConsumer {
    
    private static final Logger log = LoggerFactory.getLogger(MessageConsumer.class);
    
    @Bean
    public Consumer<String> consumer() {
        return message -> log.info("Received message: {}", message);
    }
}

5. 发送各种类型的消息

普通消息

java
@Service
public class MessageService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    // 同步发送消息
    public SendResult syncSend(String topic, String message) {
        return rocketMQTemplate.syncSend(topic, message);
    }
    
    // 异步发送消息
    public void asyncSend(String topic, String message) {
        rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("Async send message success: {}", sendResult);
            }
            
            @Override
            public void onException(Throwable throwable) {
                log.error("Async send message failed", throwable);
            }
        });
    }
    
    // 单向发送消息(不关心结果)
    public void sendOneWay(String topic, String message) {
        rocketMQTemplate.sendOneWay(topic, message);
    }
}

顺序消息

顺序消息是指消息的消费顺序与发送顺序保持一致。RocketMQ 提供了"分区顺序消息",即保证同一分区内的消息顺序,而不同分区之间的消息无序。

java
@Service
public class OrderlyMessageService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    // 发送顺序消息(使用哈希选择队列)
    public SendResult sendOrderlyMessage(String topic, String message, String hashKey) {
        return rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
    }
}

配置消费者顺序消费:

yaml
spring:
  cloud:
    stream:
      rocketmq:
        bindings:
          consumer-in-0:
            consumer:
              orderly: true  # 顺序消费

延时消息

RocketMQ 支持发送延时消息,目前支持以下级别的延时:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h。

java
@Service
public class DelayedMessageService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    // 发送延时消息
    public SendResult sendDelayedMessage(String topic, String message, int delayLevel) {
        Message<String> msg = MessageBuilder.withPayload(message).build();
        return rocketMQTemplate.syncSend(topic, msg, 3000, delayLevel);
    }
}

事务消息

事务消息可以确保本地事务和消息发送的原子性,常用于分布式事务场景。

java
@Service
public class TransactionMessageService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    // 发送事务消息
    public void sendTransactionMessage(String topic, String message, Object arg) {
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(topic, MessageBuilder.withPayload(message).build(), arg);
        log.info("Transaction message send result: {}", result);
    }
}

@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
    
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务
        try {
            // 业务逻辑...
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 检查本地事务状态
        // 如果消息未收到确认,MQ会定期调用此方法,确认事务状态
        // 1. 检查本地事务执行结果
        // 2. 根据结果返回事务状态
        return RocketMQLocalTransactionState.COMMIT;
    }
}

批量消息

批量消息可以提高传输效率,一次性发送多条消息。

java
@Service
public class BatchMessageService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    // 发送批量消息
    public SendResult sendBatchMessages(String topic, List<String> messages) {
        List<Message<String>> messageList = messages.stream()
                .map(msg -> MessageBuilder.withPayload(msg).build())
                .collect(Collectors.toList());
        return rocketMQTemplate.syncSend(topic, messageList);
    }
}

高级特性与配置

1. 消息过滤

RocketMQ 支持基于 Tag 和 SQL 表达式的消息过滤。

Tag 过滤配置:

yaml
spring:
  cloud:
    stream:
      rocketmq:
        bindings:
          consumer-in-0:
            consumer:
              tags: tagA || tagB  # 接收 tagA 或 tagB 的消息

发送带 Tag 的消息:

java
public void sendMessageWithTag(String topic, String tag, String message) {
    rocketMQTemplate.syncSend(topic + ":" + tag, message);
}

SQL 过滤配置:

yaml
spring:
  cloud:
    stream:
      rocketmq:
        bindings:
          consumer-in-0:
            consumer:
              sql: "a between 0 and 3"  # SQL 过滤表达式

发送带属性的消息:

java
public void sendMessageWithProperty(String topic, String message) {
    Message<String> msg = MessageBuilder
            .withPayload(message)
            .setHeader("a", 1)
            .build();
    rocketMQTemplate.syncSend(topic, msg);
}

2. 消息重试

当消息消费失败时,RocketMQ 支持消息重试机制。

yaml
spring:
  cloud:
    stream:
      rocketmq:
        bindings:
          consumer-in-0:
            consumer:
              delayLevelWhenNextConsume: 1  # 重试等级
              maxReconsumeTimes: 3  # 最大重试次数

3. 并发消费

RocketMQ 支持并发消费消息,提高消费效率:

yaml
spring:
  cloud:
    stream:
      rocketmq:
        bindings:
          consumer-in-0:
            consumer:
              orderly: false  # 非顺序消费(并发消费)
              consumeThreadMin: 20  # 最小消费线程数
              consumeThreadMax: 64  # 最大消费线程数

4. 集群消费与广播消费

RocketMQ 支持集群消费(默认模式)和广播消费两种模式:

  • 集群消费:同一个消费组中的消费者平均分摊消息
  • 广播消费:同一个消费组中的每个消费者都接收所有消息

配置广播消费:

yaml
spring:
  cloud:
    stream:
      rocketmq:
        bindings:
          consumer-in-0:
            consumer:
              broadcasting: true  # 广播消费

5. 消息持久化

RocketMQ 默认提供了高可靠的消息持久化机制,可以通过 Broker 配置文件调整持久化策略:

properties
# 同步刷盘
flushDiskType = SYNC_FLUSH

# 异步刷盘(默认)
flushDiskType = ASYNC_FLUSH

最佳实践

1. 生产者最佳实践

  • 合理设置消息发送超时时间,避免消息发送过慢影响业务
  • 使用异步发送提高发送性能,但要注意处理异步回调
  • 批量发送合理使用,以提高传输效率
  • 避免发送大消息,消息体应控制在 4MB 以内
  • 为消息设置 Tag,便于消费者过滤
  • 设置消息重试策略,防止消息发送失败

2. 消费者最佳实践

  • 保证消费幂等性,防止消息重复消费导致业务错误
  • 合理设置消费线程数,根据业务特点和机器配置调整
  • 合理选择消费模式,顺序敏感场景使用顺序消费,高吞吐场景使用并发消费
  • 处理消费异常,避免因消费异常导致消息积压
  • 及时提交消费位点,防止重复消费
  • 设置合理的重试策略,避免无效重试消耗资源

3. 运维最佳实践

  • 合理规划硬件资源,特别是磁盘和内存
  • 定期清理过期消息,防止磁盘占用过大
  • 监控消息堆积,及时发现消费异常
  • 备份重要消息,防止数据丢失
  • 使用集群部署,提高可用性

监控与运维

1. RocketMQ Dashboard

RocketMQ 提供了 Dashboard 工具,用于监控和管理 RocketMQ 集群,包括以下功能:

  • Topic 管理
  • 消费者管理
  • 消息查询
  • 集群监控
  • 运行状态查看

2. 整合 Prometheus 和 Grafana

RocketMQ 支持通过 JMX 暴露监控指标,可以整合 Prometheus 和 Grafana 构建强大的监控系统。

常见问题与解决方案

1. 消息丢失问题

可能原因:

  • 生产者发送失败
  • Broker 宕机,消息未持久化
  • 消费者消费后,未正确提交消费位点

解决方案:

  • 使用同步发送或异步发送时处理发送结果
  • 配置 Broker 主从架构,启用同步刷盘
  • 确保消费成功后再提交消费位点

2. 消息重复消费

可能原因:

  • 生产者重试发送
  • 消费者处理成功,但提交消费位点失败

解决方案:

  • 业务层实现幂等性处理,如唯一索引、去重表、状态检查等
  • 使用分布式锁避免重复处理

3. 消息积压

可能原因:

  • 消费速度慢于生产速度
  • 消费者处理消息异常
  • 消费者数量不足

解决方案:

  • 增加消费者数量或实例
  • 优化消费逻辑,提高处理效率
  • 增加消费线程数
  • 临时扩容,消费积压消息

4. 顺序消息错乱

可能原因:

  • 消息发送时未使用相同的分区键
  • 消费者配置不正确,未启用顺序消费

解决方案:

  • 确保相关的消息使用相同的分区键发送
  • 配置消费者为顺序消费模式

总结

RocketMQ 作为 Spring Cloud Alibaba 的重要组件,为微服务架构提供了强大的消息服务能力。它具有高性能、高可靠性、丰富的功能特性,适用于各种复杂的业务场景。通过与 Spring Cloud Stream 的无缝集成,开发者可以使用统一的编程模型来处理消息,大大简化了消息队列的使用成本。

在实际应用中,选择合适的消息类型、合理配置生产者和消费者参数、实现幂等消费、做好监控运维等最佳实践,可以充分发挥 RocketMQ 的优势,构建高可靠、高性能的分布式系统。