RabbitMQ优先级队列详解

1. 前言

RabbitMQ将消息写入队列中都是按顺序写的,消费时也是按顺序进行消费,队列中的消息是先进先出(FIFO).。

image-20241022110252246

首先测试一下没有优先级的效果。先注释掉所有的消费者,不然发一个就消费一个,看不出优先级的效果,只有消息堆积在一起时,才能看到效果。

先发送10条消息:

image-20241022110348678

发送完成后,再开启消费者进行消费,没有设置优先级,那么先到达队列的消息先消费,收到的消息会从按照0-9的顺序消费。可以看到其时按照投递顺序消费,先进先出:

image-20241022110436762

在一些场景中我们需要将某些消息优先处理,也就是设置优先级,优先级高的消息优先被消费。

2. 设置优先级队列

可以直接在控制台,设置队列的最大优先级,优先级的取值范围为0~255,一般设置最大值为10

image-20241022110516714

添加优先级后,可以在队列中看到Pri标记:

image-20241022110537909

在代码中,可以添加x-max-priority参数设置:

@Bean("priorityQueue")
public Queue priorityQueue() {
    Map<String, Object> arguments = new HashMap<>();
    arguments.put("x-max-priority", 10);
    return QueueBuilder.durable("priorityQueue").withArguments(arguments).build();
}

@Bean("priorityExchange")
public Exchange priorityExchange() {
    return ExchangeBuilder.directExchange("priorityExchange").durable(true).build();
}

@Bean("priorityBinding")
public Binding priorityBinding(@Qualifier("priorityQueue") Queue priorityQueue, @Qualifier("priorityExchange") Exchange priorityExchange) {
    return BindingBuilder.bind(priorityQueue).to(priorityExchange).with("priority.key").noargs();
}

也可以使用maxPriority方法设置:

@Bean("priorityQueue")
public Queue priorityQueue() {
    // maxPriority 设置最大优先级
    return QueueBuilder.durable("priorityQueue").maxPriority(10).build();
}

3. 消息设置优先级

设置完优先级队列后,发送消息时,需要设置优先级。这里循环发送十条消息,优先级从0~9 递增:

for (int i = 0; i < 10; i++) {
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setPriority(i); // 设置优先级
    System.out.println("发送消息,优先级为:"+i);
    Message message = new Message(("消息优先级为:"+i).getBytes(), messageProperties);
    TimeUnit.SECONDS.sleep(1);
    rabbitTemplate.send("priorityExchange", "priority.key", message);
}

发送消息:

image-20241022110627294

开启消费者消费,可以看到优先级高的优先被消费:

image-20241022110708453