RabbitMQ优先级队列详解
1. 前言
RabbitMQ
将消息写入队列中都是按顺序写的,消费时也是按顺序进行消费,队列中的消息是先进先出(FIFO).。
首先测试一下没有优先级的效果。先注释掉所有的消费者,不然发一个就消费一个,看不出优先级的效果,只有消息堆积在一起时,才能看到效果。
先发送10条消息:
发送完成后,再开启消费者进行消费,没有设置优先级,那么先到达队列的消息先消费,收到的消息会从按照0-9
的顺序消费。可以看到其时按照投递顺序消费,先进先出:
在一些场景中我们需要将某些消息优先处理,也就是设置优先级
,优先级高的消息优先被消费。
2. 设置优先级队列
可以直接在控制台,设置队列的最大优先级,优先级的取值范围为0~255
,一般设置最大值为10
。
添加优先级后,可以在队列中看到Pri
标记:
在代码中,可以添加x-max-priority
参数设置:
@Bean("priorityQueue")
public Queue priorityQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-max-priority", 10);
return QueueBuilder.durable("priorityQueue").withArguments(arguments).build();
}
@Bean("priorityExchange")
public Exchange priorityExchange() {
return ExchangeBuilder.directExchange("priorityExchange").durable(true).build();
}
@Bean("priorityBinding")
public Binding priorityBinding(@Qualifier("priorityQueue") Queue priorityQueue, @Qualifier("priorityExchange") Exchange priorityExchange) {
return BindingBuilder.bind(priorityQueue).to(priorityExchange).with("priority.key").noargs();
}
也可以使用maxPriority
方法设置:
@Bean("priorityQueue")
public Queue priorityQueue() {
// maxPriority 设置最大优先级
return QueueBuilder.durable("priorityQueue").maxPriority(10).build();
}
3. 消息设置优先级
设置完优先级队列后,发送消息时,需要设置优先级。这里循环发送十条消息,优先级从0~9
递增:
for (int i = 0; i < 10; i++) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setPriority(i); // 设置优先级
System.out.println("发送消息,优先级为:"+i);
Message message = new Message(("消息优先级为:"+i).getBytes(), messageProperties);
TimeUnit.SECONDS.sleep(1);
rabbitTemplate.send("priorityExchange", "priority.key", message);
}
发送消息:
开启消费者消费,可以看到优先级高的优先被消费: