RabbitMQ延迟队列

1. 前言

延迟队列 :即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

需求场景: 用户下单后,30分钟未支付,取消订单,回滚库存。

本文介绍了三种方式实现,前两种存在一定的局限性

2. 过期消息实现延迟队列

发送带有TTL过期属性的消息,到达过期时间后,投递到死信队列,实现延迟队列功能。添加一个死信交换机、死信队列:

@Configuration
public class RabbitMqDeadQueueConfig {
   private static final String DEAD_QUEUE = "deadQueue";

    private static final String DEAD_EXCHANGE = "deadExchange";

    private static final String DEAD_ROUTE_KEY = "dead.key";

    /**
     * 死信队列
     */
    @Bean(DEAD_QUEUE)
    public Queue deadQueue() {
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    /**
     * 死信交换机
     */
    @Bean(DEAD_EXCHANGE)
    public Exchange deadExchange() {
        return ExchangeBuilder.directExchange(DEAD_EXCHANGE).durable(true).build();
    }

    /**
     * 创建死信队列和死信交换机的绑定关系
     */
    @Bean("deadBinding")
    public Binding deadBinding(@Qualifier(DEAD_QUEUE) Queue deadQueue, @Qualifier(DEAD_EXCHANGE) Exchange directExchange) {
        return BindingBuilder.bind(deadQueue).to(directExchange).with(DEAD_ROUTE_KEY).and(null);
    }
}

添加一个延迟交换机、延迟队列:

@Configuration
public class RabbitMqDelayQueueConfig {

   private static final String DEAD_EXCHANGE = "deadExchange";
   private static final String DEAD_ROUTE_KEY = "dead.key";

    /**
     * 使用 ExchangeBuilder 创建交换机
     */
    @Bean("delayExchange")
    public Exchange bootExchange() {
        return ExchangeBuilder.directExchange("delayExchange").durable(true).build();
    }

    /**
     * 创建队列:
     * 指定死信交换机、路由KEY
     */
    @Bean("delayQueue")
    public Queue bootQueue001() {
        return QueueBuilder.durable("delayQueue").deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTE_KEY).build();
    }

    /**
     * 创建绑定关系
     */
    @Bean("delayBinding")
    public Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue, @Qualifier("delayExchange") Exchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.key").and(null);
    }
}

创建一个消费者,接收死信消息:

@Component
public class RabbitConsumer {
    @RabbitListener(queues = {"deadQueue"})
    public void deadQueue(Message message) {
        System.out.println("收到消息" + new String(message.getBody()));
        System.out.println("当前时间:"+ LocalDateTime.now());
        System.out.println("判断订单状态...." + new String(message.getBody()));
        System.out.println("未支付,回滚数据库....");
    }

}

发送TTL订单消息:

MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("10000"); // 10秒过期
System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
Message message = new Message("订单:ID:001 状态:未支付".getBytes(), messageProperties);
rabbitTemplate.send("delayExchange", "delay.key", message);

运行程序,可以看到,过了10秒,收到了订单信息~

image-20241022154118447

但是该方式存在一个致命缺陷 ,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的。每条消息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列。RabbitMQ是等消息到达队列顶部即将被消费时,才会判断其是否过期并删除。所以即使消息过期,也不会马上从队列中抹去。

首先发送一个过期时间为20秒的消息,再发送一个过期时间为10秒的消息:

// 发送过期时间为20秒的消息,先到达队列
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("20000"); // 20秒过期
System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
Message message001 = new Message("订单:ID:001 状态:未支付".getBytes(), messageProperties);
rabbitTemplate.send("delayExchange", "delay.key", message001);
// 发送过期时间为10秒的消息,后达队列
messageProperties.setExpiration("10000"); // 20秒过期
System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
Message message002 = new Message("订单:ID:002 状态:未支付".getBytes(), messageProperties);
rabbitTemplate.send("delayExchange", "delay.key", message002);

测试发现,第二条过期时间为10秒的消息,虽然过期时间更短,但也需要等到第一条过期后,到达消息顶部,才会被扫描是否过期,由此可见, 过期消息实现延迟队列并不可取~~~

image-20241022154229853

3. 过期队列实现延迟队列

实现思路:

1. 用户下单后生成订单发送订单消息到延迟队列中并设置过期时间为30分钟该队列没有消费者
2. 订单队列消息过期后发送订单至死信队列
3. 死信队列消费者接收到消息后判断订单状态进行后续操作

image-20241022154300401

给整个队列添加过期时间实现延迟队列。由于过期时间作用于整个队列,所以不是很灵活,比如设置30分钟需要一个队列,设置10分钟时,又需要创建一个队列。

创建一个过期时间队列:

@Bean("delayQueue")
public Queue delayQueue() {
    return QueueBuilder.durable("delayQueue").withArgument("x-message-ttl", 10000).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTE_KEY).build();
}

参照上面的案例发送消息即可。

4. 插件实现延迟队列

上面我们讨论了两种实现延迟队列的方式,但是都存在一些问题,官网也提供了基于插件的方式来实现。消息到达延迟交换机后,消息不会立即进入队列,先将消息保存至表中,插件将会尝试确认消息是否过期,如果消息过期则投递至目标队列。

image-20241022155056210

4.1 安装插件

官网中提供了很多插件,以满足更多的功能需求。

GitJHub中下载延迟插件:

image-20241022154610802

将其放在RabbitMQ程序主目录的plugins下:

image-20241022154638810

切换到sbin目录下,运行安装插件命令:

~~~shell rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange ~~~

成功安装提示:

image-20241022154738723

重启RabbitMQ,进入到交换机页面,添加交换机,可以看到一个新的类型为x-delayed-message,说明插件安装成功:

image-20241022154801451

4.2 代码实现

首先创建x-delayed-message类型的交换机,并绑定队列:

@Configuration
public class RabbitMqDelayQueueConfig {
   @Bean("delayExchange")
    public CustomExchange delayExchange() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange("delayExchange", "x-delayed-message", true, false, arguments);
    }

    /**
     * 创建队列:
     * 指定死信交换机、路由KEY
     */
    @Bean("delayQueue")
    public Queue delayQueue() {
        return QueueBuilder.durable("delayQueue").build();
    }

    /**
     * 创建绑定关系
     */
    @Bean("delayBinding")
    public Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue, @Qualifier("delayExchange") Exchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.key").noargs();
    }
}

创建消费者,监听延迟队列:

@RabbitListener(queues = {"delayQueue"})
public void deadQueue(Message message) {
    System.out.println("收到消息" + new String(message.getBody()));
    System.out.println("当前时间:"+ LocalDateTime.now());
    System.out.println("判断订单状态...." + new String(message.getBody()));
    System.out.println("未支付,回滚数据库....");
}

创建生产者,发送不同延迟时间的消息:

// 发送延迟时间为20秒的消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDelay(20000); // 20秒过期
System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
Message message001 = new Message("订单:ID:001 状态:未支付".getBytes(), messageProperties);
rabbitTemplate.send("delayExchange", "delay.key", message001);
// 发送延迟时间为10秒的消息
messageProperties.setDelay(10000); // 10秒过期
System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
Message message002 = new Message("订单:ID:002 状态:未支付".getBytes(), messageProperties);
rabbitTemplate.send("delayExchange", "delay.key", message002);

4.3 测试

可以看到第一条消息和第二条消息,都是在各自指定的延迟时间后被消费,并没有出现时序问题,而且每个消息都具有不同的延迟时间,灵活性很高。

所以插件是实际应用中常用的一种方式,要实现延迟队列功能,当前其他MQ,甚至Redis也可以~~~

image-20241022154857174