RabbitMQ消息可靠之发布确认、退回机制
1. 前言
在正常的情况下,一个消息的完整生命周期为:发布=》交换机=》队列=》消费
。如下图所示 :
但由于各种原因,存在可靠性问题,比如:
- 消息发送了,但未正常到达交换机
- 到达了交换机,但没有找到匹配的队列,消息丢失
- 消息已投递,但未被消费者成功接收到
- 消费者消费时,消息处理异常
为了解决以上种种问题,因此发布者和消费者都需要一种用于传递和处理确认的机制。RabbitMQ
提供了各种机制,首先是消息发布可靠性机制
。
消息发布可靠性机制
是指消息发送方杜绝任何发送消息丢失或者投递失败 场景,确保消息被正常投递到队列中,RabbitMQ
提供了两种模式。
-
确认模式(confirm)
-
退回模式(return)
2. 退回模式
退回模式(return)
说的是当消息到达交换机后,但是没有找到匹配的队列时,将消息回退给生产者。默认情况下,如果消息没有匹配到队列会直接丢弃,采用退回模式
可以在生产者端监听改消息是否被成功投递到队列中。
2.1 开启
在配置文件中开启退回模式:
spring:
rabbitmq:
username: guest
password: guest
host: localhost
port: 5672
# 开启回退模式
publisher-returns: true
2.2 回调函数
给RabbitTemplate
设置一个returnsCallback
回调函数,如果消息路由键没有匹配的队列,投递出去后将被退回,执行回调函数。
// 回退模式回调函数
rabbitTemplate.setReturnsCallback(returnedMessage -> {
System.out.println("消息被回退,原因:"+returnedMessage.getReplyText());
System.out.println(returnedMessage.getMessage()); // 回退消息
System.out.println( returnedMessage.getExchange()); // 交换机
System.out.println(returnedMessage.getReplyCode()); // 返回原因的代码
System.out.println(returnedMessage.getReplyText()); // 返回信息,例如NO_ROUTE
System.out.println(returnedMessage.getRoutingKey()); // 路由KEY
});
// 路由键aaa.bbb.key,没有匹配的队列
rabbitTemplate.convertAndSend("bootExchange", "aaa.bbb.key", "HELLO SPRING BOOT");
2.3 测试
运行程序,可以看到回调函数被执行,并返回原因为NO_ROUTE
:
3. 确认模式
确认模式(confirm)
:生产者发送消息后,交换机收到消息会执行确认回调函数,告诉生产者,消息成功投递到交换机中。
3.1 配置
开启发布确认模式配置项publisher-confirm-type
,该配置项是枚举,可以配置为NONE
,SIMPLE
,CORRELATED
。
spring:
rabbitmq:
username: guest
password: guest
host: localhost
port: 5672
# 开启
publisher-confirm-type: CORRELATED
# springboot2.2.0,该属性已过时,通过publisher-confirm-type配置
# publisher-confirms: true
NONE
表示关闭确认回调,这是默认配置。
3.2 CORRELATED
CORRELATED
翻译过来是相关的意思,源码关于该配置项的解释是与{@code CorrelationData}一起使用以将确认与发送的消息
。这是Spring
整合RabbitMQ
最开始提供的一种确认模式使用方式,通过给RabbitTemplate
设置一个确认回调函数,来获取确认结果。RabbitTemplate
只支持一个ConfirmCallback
。
在setConfirmCallback
方法中,需要一个实现了RabbitTemplate.ConfirmCallback
函数式接口的对象,并实现其confirm
方法,该方法有三个参数:
- correlationData :客户端在发送原始消息时提供的对象。
- ack :exchange交换机是否成功收到了消息。true成功,false代表失败。
- cause :失败原因。
@FunctionalInterface
public interface ConfirmCallback {
void confirm(@Nullable CorrelationData correlationData , boolean ack, @Nullable String cause);
}
代码如下 :
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("进入回调方法");
System.out.println("消息ID:" + correlationData.getId());
// 如果开启了退回模式,还可以通过correlationData获取退回消息
if (correlationData.getReturned() != null) {
System.out.println("消息被退回,原因:" + correlationData.getReturned().getReplyText());
}
if (b) {
System.out.println("确认接收到消息");
} else {
System.out.println("失败,原因:" + s);
}
}
});
// 可以创建CorrelationData对象,设置id属性,用来表示当前消息的唯一性。
// 确认时,可以获取ID,知道是哪个消息
CorrelationData correlationData = new CorrelationData("34424343");
rabbitTemplate.convertAndSend("bootExchange", "boot222.key", "HELLO SPRING BOOT", correlationData);
执行结果 :服务端返回确认消息,改消息已成功接收,但是由于没有找到匹配理由,被退回。
3.3 SIMPLE
SIMPLE
类型,源码注释如下:
Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()} within scoped operations.
大意为:直接在范围内的操作可直接使用RabbitTemplate
的waitForConfirms()
或者waitForConfirmsOrDie()
方法。
什么是范围内操作?
通常,在使用RabbitTemplate
模板类时,从缓存中获取或创建通道,用于执行操作,然后将通道返回缓存以供重用。在多线程环境中,无法保证下一个操作使用相同的通道。但是,有时需要对通道的使用有更多的控制,并确保在同一通道上执行许多操作。
从2.0版 开始,RabbitTemplate
提供了一个名为invoke()
的新方法:
default <T> T invoke(OperationsCallback<T> action) throws AmqpException {
return this.invoke(action, (ConfirmCallback)null, (ConfirmCallback)null);
}
该方法需要一个OperationsCallback
参数,在该对象的doInRabbit()
方法中,任何操作都使用相同的专用通道,该通道将在结束时关闭(不会返回到缓存)。这种使用方式就叫做范围内操作
。
修改配置为SIMPLE
:
spring:
rabbitmq:
# 开启回退模式
publisher-returns: true
publisher-confirm-type: simple
直接运行第二节中的代码,可以看到SIMPLE
和CORRELATED
作用一致:
除此之外,SIMPLE
还可以直接在范围内操作直接使用RabbitTemplate
的waitForConfirms()
或者waitForConfirmsOrDie()
方法,这个时候就需要使用invoke()
。
Boolean invokeResult = rabbitTemplate.invoke(new RabbitOperations.OperationsCallback<Boolean>() {
@Override
public Boolean doInRabbit(RabbitOperations rabbitOperations) {
// 1. 发送消息
for (int i = 0; i < 100; i++) {
System.out.println(i);
rabbitOperations.convertAndSend("bootExchange", "aaa.bbb.key", "HELLO SPRING BOOT");
}
// 2. 等待确认并返回结果,具有阻塞性, 参数为超时时间,如果未在超时时间内消息代理确认该消息,则该方法将引发超时的异常。
return rabbitOperations.waitForConfirms(1L);
// return rabbitTemplate.waitForConfirmsOrDie(1L); 异常后信道被关闭,生产者发布不能继续发布消息
}
});
if (Boolean.TRUE.equals(invokeResult)) {
System.out.println("确认收到");
} else {
System.out.println("没有收到");
}