RabbitMQ 死信队列

1. 概念

无法被消费的消息被称为死信,存放死信的队列也就是死信队列。由于某些特定的原因导致队列中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信。例如消费消息时,发生异常,经过一定次数的重试后,该消息依然无法被正常消费,此时可以将该消息放入死信队列中,后续进行人工干预。

消息成为死信的三种情况:

  1. 队列消息长度到达限制
  2. 消费者异常拒接消费消息
  3. 原队列存在消息过期设置,消息到达超时时间未被消费

一个简单的死信处理流程图如下:

image-20241022105512130

流程图说明:

  1. 生产者投递消息,消费者监听队列消息
  2. 产生死信消息时,投递到死信队列
  3. 死信消费者消费死信消息,存入到数据库,并进行人工干预处理

2. 创建死信交换机、队列

死信交换机、死信队列需要我们自己创建,只是业务中用来存放死信的“特殊”交换机队列。其他队列可以指定死信交换机、死信队列,当发生死信时,自动将其投递到死信队列中。创建死信交换机、死信队列

@Configuration
public class RabbitMqDeadQueueConfig {

    private static final String DEAD_QUEUE = "deadQueue";

    private static final String DEAD_EXCHANGE = "deadExchange";

    private static final String DEAD_ROUTE_KEY = "dead.key";

    /**
     * 死信队列
     */
    @Bean(DEAD_QUEUE)
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    /**
     * 死信交换机
     */
    @Bean(DEAD_EXCHANGE)
    public Exchange deadExchange() {
        return ExchangeBuilder.directExchange(DEAD_EXCHANGE).durable(true).build();
    }

    /**
     * 创建死信队列和死信交换机的绑定关系
     */
    @Bean("deadBinding")
    public Binding deadBinding(@Qualifier(DEAD_QUEUE) Queue deadQueue, @Qualifier(DEAD_EXCHANGE) Exchange directExchange) {
        return BindingBuilder.bind(deadQueue).to(directExchange).with(DEAD_ROUTE_KEY).and(null);
    }
}

创建正常的业务消费队列,并指定指定死信交换机、路由KEY

@Configuration
public class RabbitMqConfig {
    private static final String DEAD_QUEUE = "deadQueue";

    private static final String DEAD_EXCHANGE = "deadExchange";

    private static final String DEAD_ROUTE_KEY = "dead.key";

    /**
     * 使用 ExchangeBuilder 创建交换机
     */
    @Bean("bootExchange")
    public Exchange bootExchange() {
        return ExchangeBuilder.directExchange("bootExchange").durable(true).build();
    }

    /**
     * 创建队列:
     * 指定死信交换机、路由KEY
     */
    @Bean("bootQueue")
    public Queue bootQueue001() {
        return QueueBuilder.durable("bootQueue").deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTE_KEY).build();
    }
}

创建一个死信消费者,消费死信,存入到数据库,等待人工干预处理:

@RabbitListener(queues = {"deadQueue"})
public void receiveMessage001(Message message) {
    System.out.println("收到死信消息" + new String(message.getBody()));
    System.out.println("存入数据库,等待人工干预");
}

3. 过期导致死信

首先模拟过期导致死信,注释正常业务消费者代码,发送一条TTL 消息:

// 1. 消息过期
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("10000");
Message message = new Message("HELLO TTL".getBytes(), messageProperties);
rabbitTemplate.send("bootExchange", "boot.key", message);

到了过期时间后,由于没有消费者消费该消息,成为死信,最终被死信消费者接收到:

image-20241022105615122

4. 拒接消费

在之前,我们介绍过消息确认ACK机制,当拒收消息时,也可以放入死信队列中。参考上面的文档,开启手动确认模式,可以看到拒接消息后,也会存放到死信队列中:

image-20241022105639969

5. 长度限制

创建队列,指定长度为1,当队列中的消息已经达到了这个最大长度限制时,再次投递,消息将被挤掉,被挤掉的会进入死信队列。

/**
 * 创建队列:
 * 指定死信交换机、路由KEY、数量限制
 */
@Bean("bootQueue")
public Queue bootQueue() {
    return QueueBuilder.durable("bootQueue").maxLength(1).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTE_KEY).build();
}

批量发送一百条消息:

~~~java Message message = new Message("HELLO WORLD".getBytes(StandardCharsets.UTF_8)); for (int i = 0; i < 100; i++) { rabbitTemplate.send("bootExchange", "boot.key", message); } ~~~

关闭业务监听消费者,运行发送消息,可以看到大量消息进入死信队列:

image-20241022105718455

查看控制台,只有第一条消息被保存在队列中:

image-20241022105742909