RabbitMQ 死信队列
1. 概念
无法被消费的消息被称为死信
,存放死信
的队列也就是死信队列
。由于某些特定的原因导致队列中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信
。例如消费消息时,发生异常,经过一定次数的重试后,该消息依然无法被正常消费,此时可以将该消息放入死信队列中,后续进行人工干预。
消息成为死信的三种情况:
- 队列消息长度到达限制
- 消费者异常拒接消费消息
- 原队列存在消息过期设置,消息到达超时时间未被消费
一个简单的死信处理流程图如下:
流程图说明:
- 生产者投递消息,消费者监听队列消息
- 产生死信消息时,投递到死信队列
- 死信消费者消费死信消息,存入到数据库,并进行人工干预处理
2. 创建死信交换机、队列
死信交换机、死信队列
需要我们自己创建,只是业务中用来存放死信的“特殊”
交换机队列。其他队列可以指定死信交换机、死信队列
,当发生死信时,自动将其投递到死信队列
中。创建死信交换机、死信队列
:
@Configuration
public class RabbitMqDeadQueueConfig {
private static final String DEAD_QUEUE = "deadQueue";
private static final String DEAD_EXCHANGE = "deadExchange";
private static final String DEAD_ROUTE_KEY = "dead.key";
/**
* 死信队列
*/
@Bean(DEAD_QUEUE)
public Queue deadQueue() {
return QueueBuilder.durable(DEAD_QUEUE).build();
}
/**
* 死信交换机
*/
@Bean(DEAD_EXCHANGE)
public Exchange deadExchange() {
return ExchangeBuilder.directExchange(DEAD_EXCHANGE).durable(true).build();
}
/**
* 创建死信队列和死信交换机的绑定关系
*/
@Bean("deadBinding")
public Binding deadBinding(@Qualifier(DEAD_QUEUE) Queue deadQueue, @Qualifier(DEAD_EXCHANGE) Exchange directExchange) {
return BindingBuilder.bind(deadQueue).to(directExchange).with(DEAD_ROUTE_KEY).and(null);
}
}
创建正常的业务消费队列,并指定指定死信交换机、路由KEY :
@Configuration
public class RabbitMqConfig {
private static final String DEAD_QUEUE = "deadQueue";
private static final String DEAD_EXCHANGE = "deadExchange";
private static final String DEAD_ROUTE_KEY = "dead.key";
/**
* 使用 ExchangeBuilder 创建交换机
*/
@Bean("bootExchange")
public Exchange bootExchange() {
return ExchangeBuilder.directExchange("bootExchange").durable(true).build();
}
/**
* 创建队列:
* 指定死信交换机、路由KEY
*/
@Bean("bootQueue")
public Queue bootQueue001() {
return QueueBuilder.durable("bootQueue").deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTE_KEY).build();
}
}
创建一个死信消费者,消费死信,存入到数据库,等待人工干预处理:
@RabbitListener(queues = {"deadQueue"})
public void receiveMessage001(Message message) {
System.out.println("收到死信消息" + new String(message.getBody()));
System.out.println("存入数据库,等待人工干预");
}
3. 过期导致死信
首先模拟过期导致死信,注释正常业务消费者代码,发送一条TTL
消息:
// 1. 消息过期
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("10000");
Message message = new Message("HELLO TTL".getBytes(), messageProperties);
rabbitTemplate.send("bootExchange", "boot.key", message);
到了过期时间后,由于没有消费者消费该消息,成为死信,最终被死信消费者接收到:
4. 拒接消费
在之前,我们介绍过消息确认ACK机制,当拒收消息时,也可以放入死信队列中。参考上面的文档,开启手动确认模式,可以看到拒接消息后,也会存放到死信队列中:
5. 长度限制
创建队列,指定长度为1
,当队列中的消息已经达到了这个最大长度限制时,再次投递,消息将被挤掉,被挤掉的会进入死信队列。
/**
* 创建队列:
* 指定死信交换机、路由KEY、数量限制
*/
@Bean("bootQueue")
public Queue bootQueue() {
return QueueBuilder.durable("bootQueue").maxLength(1).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTE_KEY).build();
}
批量发送一百条消息:
~~~java Message message = new Message("HELLO WORLD".getBytes(StandardCharsets.UTF_8)); for (int i = 0; i < 100; i++) { rabbitTemplate.send("bootExchange", "boot.key", message); } ~~~
关闭业务监听消费者,运行发送消息,可以看到大量消息进入死信队列:
查看控制台,只有第一条消息被保存在队列中: