RabbitMQ消息可靠之发布确认、退回机制

1. 前言

在正常的情况下,一个消息的完整生命周期为:发布=》交换机=》队列=》消费如下图所示

image-20241021142811947

但由于各种原因,存在可靠性问题,比如:

  • 消息发送了,但未正常到达交换机
  • 到达了交换机,但没有找到匹配的队列,消息丢失
  • 消息已投递,但未被消费者成功接收到
  • 消费者消费时,消息处理异常

为了解决以上种种问题,因此发布者和消费者都需要一种用于传递和处理确认的机制。RabbitMQ提供了各种机制,首先是消息发布可靠性机制

消息发布可靠性机制是指消息发送方杜绝任何发送消息丢失或者投递失败 场景,确保消息被正常投递到队列中,RabbitMQ提供了两种模式。

  • 确认模式(confirm)

  • 退回模式(return)

2. 退回模式

退回模式(return)说的是当消息到达交换机后,但是没有找到匹配的队列时,将消息回退给生产者。默认情况下,如果消息没有匹配到队列会直接丢弃,采用退回模式可以在生产者端监听改消息是否被成功投递到队列中。

2.1 开启

在配置文件中开启退回模式:

spring:
  rabbitmq:
    username: guest
    password: guest
    host: localhost
    port: 5672
    # 开启回退模式
    publisher-returns: true

2.2 回调函数

RabbitTemplate设置一个returnsCallback回调函数,如果消息路由键没有匹配的队列,投递出去后将被退回,执行回调函数。

// 回退模式回调函数
rabbitTemplate.setReturnsCallback(returnedMessage -> {
    System.out.println("消息被回退,原因:"+returnedMessage.getReplyText());
    System.out.println(returnedMessage.getMessage()); // 回退消息
    System.out.println(  returnedMessage.getExchange()); // 交换机
    System.out.println(returnedMessage.getReplyCode()); // 返回原因的代码
    System.out.println(returnedMessage.getReplyText()); // 返回信息,例如NO_ROUTE
    System.out.println(returnedMessage.getRoutingKey()); // 路由KEY
});
// 路由键aaa.bbb.key,没有匹配的队列
rabbitTemplate.convertAndSend("bootExchange", "aaa.bbb.key", "HELLO SPRING BOOT");

2.3 测试

运行程序,可以看到回调函数被执行,并返回原因为NO_ROUTE

image-20241021143037504

3. 确认模式

确认模式(confirm):生产者发送消息后,交换机收到消息会执行确认回调函数,告诉生产者,消息成功投递到交换机中。

3.1 配置

开启发布确认模式配置项publisher-confirm-type,该配置项是枚举,可以配置为NONESIMPLECORRELATED

spring:
  rabbitmq:
    username: guest
    password: guest
    host: localhost
    port: 5672
    # 开启
    publisher-confirm-type: CORRELATED
    # springboot2.2.0,该属性已过时,通过publisher-confirm-type配置
    # publisher-confirms: true

NONE 表示关闭确认回调,这是默认配置。

3.2 CORRELATED

CORRELATED翻译过来是相关的意思,源码关于该配置项的解释是与{@code CorrelationData}一起使用以将确认与发送的消息。这是Spring 整合RabbitMQ最开始提供的一种确认模式使用方式,通过给RabbitTemplate设置一个确认回调函数,来获取确认结果。RabbitTemplate只支持一个ConfirmCallback

setConfirmCallback方法中,需要一个实现了RabbitTemplate.ConfirmCallback函数式接口的对象,并实现其confirm方法,该方法有三个参数:

  • correlationData :客户端在发送原始消息时提供的对象。
  • ack :exchange交换机是否成功收到了消息。true成功,false代表失败。
  • cause :失败原因。
@FunctionalInterface
public interface ConfirmCallback {
  void confirm(@Nullable CorrelationData correlationData , boolean ack, @Nullable String cause);
}

代码如下

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        System.out.println("进入回调方法");
        System.out.println("消息ID:" + correlationData.getId());
        // 如果开启了退回模式,还可以通过correlationData获取退回消息
        if (correlationData.getReturned() != null) {
            System.out.println("消息被退回,原因:" + correlationData.getReturned().getReplyText());
        }
        if (b) {
            System.out.println("确认接收到消息");
        } else {
            System.out.println("失败,原因:" + s);
        }
    }
});
// 可以创建CorrelationData对象,设置id属性,用来表示当前消息的唯一性。
// 确认时,可以获取ID,知道是哪个消息
CorrelationData correlationData = new CorrelationData("34424343");
rabbitTemplate.convertAndSend("bootExchange", "boot222.key", "HELLO SPRING BOOT", correlationData);

执行结果 :服务端返回确认消息,改消息已成功接收,但是由于没有找到匹配理由,被退回。

image-20241021143222426

3.3 SIMPLE

SIMPLE类型,源码注释如下:

 Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()} within scoped operations.

大意为:直接在范围内的操作可直接使用RabbitTemplatewaitForConfirms()或者waitForConfirmsOrDie()方法。

什么是范围内操作?

通常,在使用RabbitTemplate模板类时,从缓存中获取或创建通道,用于执行操作,然后将通道返回缓存以供重用。在多线程环境中,无法保证下一个操作使用相同的通道。但是,有时需要对通道的使用有更多的控制,并确保在同一通道上执行许多操作。

2.0版 开始,RabbitTemplate提供了一个名为invoke()的新方法:

default <T> T invoke(OperationsCallback<T> action) throws AmqpException {
    return this.invoke(action, (ConfirmCallback)null, (ConfirmCallback)null);
}

该方法需要一个OperationsCallback参数,在该对象的doInRabbit()方法中,任何操作都使用相同的专用通道,该通道将在结束时关闭(不会返回到缓存)。这种使用方式就叫做范围内操作

修改配置为SIMPLE

spring:
  rabbitmq:
    # 开启回退模式
    publisher-returns: true
    publisher-confirm-type: simple

直接运行第二节中的代码,可以看到SIMPLECORRELATED作用一致:

image-20241021143222426

除此之外,SIMPLE还可以直接在范围内操作直接使用RabbitTemplatewaitForConfirms()或者waitForConfirmsOrDie()方法,这个时候就需要使用invoke()

Boolean invokeResult = rabbitTemplate.invoke(new RabbitOperations.OperationsCallback<Boolean>() {
    @Override
    public Boolean doInRabbit(RabbitOperations rabbitOperations) {
        // 1. 发送消息
        for (int i = 0; i < 100; i++) {
            System.out.println(i);
            rabbitOperations.convertAndSend("bootExchange", "aaa.bbb.key", "HELLO SPRING BOOT");
        }
        // 2. 等待确认并返回结果,具有阻塞性, 参数为超时时间,如果未在超时时间内消息代理确认该消息,则该方法将引发超时的异常。
        return rabbitOperations.waitForConfirms(1L);
        // return rabbitTemplate.waitForConfirmsOrDie(1L); 异常后信道被关闭,生产者发布不能继续发布消息
    }
});
if (Boolean.TRUE.equals(invokeResult)) {
    System.out.println("确认收到");
} else {
    System.out.println("没有收到");
}