消息队列在生产环境里挖的坑,比你踩过的所有bug加起来还多
先说个真事。有个团队接入了RabbitMQ处理订单异步通知,上线半年风平浪静,直到某天大促——订单消息积压了 30 万条,消费者处理速度跟不上,监控告警满天飞,研发一顿手忙脚乱重启服务,结果消息被重复消费,3000 个用户收到了重复发货通知。
这不是故事,这是事故。
消息队列是现代后端架构的基建,几乎每个正经系统都会用到。但有意思的是,大多数团队对它的理解还停留在「装个RabbitMQ/RocketMQ,把同步调用改成异步」这个层面。真正踩过生产事故的,才知道这玩意儿有多少匪夷所思的坑。
今天来盘一盘,那些消息队列在生产环境里挖的深坑,以及怎么真正填上它。
坑一:消息丢了,你根本不知道
这是最常见也最致命的问题。消息发送出去了,消费者说没收到,两边各执一词——到底是谁的锅?
大多数团队写消息发送代码是这样的:
channel.basicPublish("order.exchange", "order.created", message);
然后打完收工,觉得消息已经稳稳当当地进了队列。
但实际上,在确认机制(Publisher Confirm)没开启的情况下,消息可能还在内存里没落盘就丢了,你根本无从得知。
正确的做法是开启 Publisher Confirm 模式:
// RabbitMQ Java Client
channel.confirmSelect();
channel.basicPublish("exchange", "routingKey",
MessageProperties.PERSISTENT_TEXT_PLAIN, body);
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) {
// 消息已被Broker确认接收
}
@Override
public void handleNack(long deliveryTag, boolean multiple) {
// 消息未被确认,需要重试或告警
}
});
同样的,消费者端也要手动确认(ACK),不能以为框架会自动做好。如果你用的是 Spring Boot + RabbitMQ,默认的 AUTO ACK 模式会在消息投递给消费者时立即确认——如果消费者处理到一半就崩溃了,消息就丢了,凉了。
改成手动确认:
@RabbitListener(queues = "order.queue")
public void handleOrder(Message message, Channel channel) {
try {
processOrder(message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 处理失败,requeue=false 防止无限循环
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
生产环境里,Publisher Confirm + Consumer ACK,二者缺一不可。别省这点代码,代价是你在凌晨三点接到电话。
坑二:重复消费——幂等是金线,不是可选项
上一段那个大促事故,根本原因就是消息被重复消费了。消费者重启后,某些消息被重新投递,原来的处理逻辑没有幂等保护,于是重复执行了副作用操作。
很多人有个误解,以为消息队列应该保证「exactly-once」,就像数据库事务一样。现实是:这是不可能的。在分布式系统里,你想做到完全不丢消息,就必须容忍重复;你想做到完全不重复,就必须容忍丢失。二选一,没有银弹。
工业界的标准解法是:at-least-once + 幂等消费者。
也就是说,消息至少被处理一次(允许重复),但消费者要保证无论处理多少次,结果都是一致的。
实现幂等的几种思路:
1. 业务去重表。 把消息 ID 存入一张去重表,用数据库唯一索引保证不重复。
// 以订单消息举例
public void processOrder(OrderMessage msg) {
String msgId = msg.getMessageId();
// 尝试插入去重表
int affected = orderMapper.insertDeduplication(msgId);
if (affected == 0) {
// 说明这条消息已经处理过了,直接返回
return;
}
// 正常处理订单业务
doProcessOrder(msg);
}
2. 乐观锁版本号。 更新业务数据时带上消息 ID 作为版本条件。
UPDATE orders SET status = 'PAID', version = version + 1
WHERE order_id = ? AND version = ? AND msg_id = ?
3. Redis 去重。 用 SETNX 操作,消息 ID 作为 key,过期时间设为消息处理的预估最大耗时。
String key = "msg:dedup:" + msgId;
Boolean locked = redisTemplate.opsForValue().setIfAbsent(key, "1", Duration.ofMinutes(30));
if (!locked) {
return; // 已处理过
}
选哪种,看你的业务场景和数据一致性要求。但无论选哪种,幂等必须在系统设计之初就定下来,而不是上线后出了问题再打补丁。
坑三:消息顺序——你的 FIFO 可能是个谎言
很多人以为消息队列是 FIFO 的,先发先到,天经地义。但实际上,在大多数消息中间件里,顺序保证是有条件的、有限制的。
以 RabbitMQ 为例:只有同一个队列里的消息才是有序的。如果你有多个消费者并行处理,消息的到达顺序完全取决于哪个消费者先空闲,而不是谁先发布。
更坑的是:RabbitMQ 默认的消息分配策略是 round-robin,消息会轮发给不同的消费者。这意味着,如果你有 3 个消费者实例,消息 1、2、3、4、5 依次发布——消息 1 可能被消费者 C 处理,消息 2 被消费者 A 处理,消息 3 被消费者 B 处理……并发场景下,处理完成的顺序完全不可控。
如果你真的需要严格保序,解决方案是:一个分区内只用一个消费者。
Kafka 的话,用单分区 + 单消费者组,就能保证严格有序。但这样会牺牲吞吐量。所以现实中的权衡是:按业务 ID 做分区,同一个业务 ID 的消息进同一个分区,保证局部有序。
// Kafka 生产端按 orderId 分区
ProducerRecord<String, String> record = new ProducerRecord<>(
"order-topic",
orderId, // 用订单ID做key,保证同一订单消息进同一分区
orderJson
);
但这里又有一个陷阱:如果你用订单 ID 做分区,而订单 ID 是自增的(比如 1, 2, 3, 4, 5),Kafka 默认的分区策略会把这些消息全发进同一个分区(因为 key 的 hash 一致)——反而把压力集中到单个分区上,成了新的瓶颈。
所以,分区策略的设计要结合业务 ID 的分布特征,不能想当然。
坑四:消息积压——你以为的「异步」其实是定时炸弹
消息积压是生产环境里最常见的问题之一。上游服务疯狂发消息,下游消费不过来,队列越来越长,内存越来越吃紧,最后要么 OOM,要么磁盘爆了。
但更可怕的是,很多人根本不知道自己的队列已经积压了——因为他们没配监控。
配监控,有两个核心指标必须看:
1. 队列深度(Queue Depth):队列里有多少条消息待消费。超过阈值(比如 1 万条)就要告警。
2. 消费延迟(Consumer Lag):消息从入队到被消费的时间差。如果 lag 持续增长,说明消费速度跟不上生产速度。
积压的应急处理:
第一,快恢优先——临时扩容消费者实例,撑过峰值。
第二,消息截断——如果积压已经严重影响实时性,可以考虑放弃部分积压消息(DLQ,死信队列),先恢复服务,再排查原因。这不是优雅的方案,但有时候是最有效的方案。
第三,源头限流——从生产端做流量控制,控制消息发送速率。这是根本解法,但需要业务方配合。
大多数积压问题,归根结底是两个原因:要么消费端代码有 BUG(比如某条消息处理超时,阻塞了后续),要么容量规划没做好,上下游速度不匹配。两者都要防,但后者是架构设计阶段就要解决的,不是靠写代码能修的。
坑五:重试机制——你以为的救命稻草可能是连环杀手
消息处理失败了,重试一下,看起来很合理。但重试机制如果没设计好,分分钟把一个小故障放大成一个系统级灾难。
想象这个场景:下游某个接口响应变慢了,超时了,消息处理失败,触发重试——重试的消息再次打到下游,下游压力更大,响应更慢,更多超时,更多重试……这就是经典的重试风暴(Retry Storm)。
解法是:指数退避 + 熔断。
// 指数退避重试
int retryCount = 0;
while (retryCount < maxRetries) {
try {
callDownstream();
break;
} catch (Exception e) {
retryCount++;
long delay = (long) Math.pow(2, retryCount) * 1000; // 1s, 2s, 4s, 8s...
Thread.sleep(delay);
}
}
但指数退避只解决了一部分问题。如果下游持续不可用,重试只会浪费资源。这时候需要一个更果断的机制:熔断器(Circuit Breaker)。当失败率超过阈值时,打开熔断器,后续请求直接返回错误或走降级逻辑,不再继续请求已经崩溃的下游。
很多团队用的是 Resilience4j 或 Sentinel,这些库都内置了熔断器的实现。
@CircuitBreaker(name = "downstream", fallbackMethod = "fallback")
public String callDownstream(String param) {
return restTemplate.getForObject("http://downstream/api", String.class);
}
public String fallback(String param, Exception e) {
// 熔断打开时走降级逻辑
return "系统繁忙,请稍后再试";
}
同时,重试队列要单独设置,不能和主业务队列混在一起——防止重试消息把正常消费的资源抢走。
总结:消息队列不是万能异步药
消息队列是好东西,但它不是银弹,不是装上就高枕无忧的万能药。
它引入的复杂度是:消息持久化、消费者管理、幂等性保证、顺序保证、积压监控、重试策略、死信处理……每一个环节没做好,都可能在生产环境里炸你一脸。
用消息队列之前,先问自己三个问题:
1. 我的业务能容忍消息丢失吗?
2. 我的业务能容忍消息重复吗?
3. 我的业务需要严格的消息顺序吗?
这三个问题的答案,决定了你需要用多重的手段来保护你的消息队列,也决定了你后续要踩多少坑。
那些在设计阶段没想清楚这些问题就上消息队列的团队,最后都花了大把时间在运维和救火上。而那些真正理解消息队列语义、在架构层面做好规划的团队,才能真正享受异步带来的红利。
共勉,别当那个凌晨三点被叫起来修消息队列的研发。