RabbitMQ备份交换机

1. 前言

在之前,我们分析了消息可靠性之发布确认、退回机制。当消息到达交换机后,但是没有找到匹配的队列时,退回模式(return)将消息回退给生产者。使用RabbitTemplate发送消息,实际调用的还是底层RabbitMQChannel完成,例如退回模式(return)开启时,调用channel.basicPublish方法,并设置了一个重要参数mandatorytrue

image-20241022110815378

mandatorytrue时,如果交换机根据自身类型和消息路由键无法找到一个符合条件的队列时,那么会调用return方法将消息返回给生产者。当mandatory设置为false时,出现上述情形RabbitMQ会直接将消息扔掉。

除了退回模式RabbitMQ还提供了备份交换机机制,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理。

流程图如下说示:

image-20241022110750812

注意事项 :当退回模式和备份交换机一起使用的时候,备份交换机的优先级比较高,不会执行回退消息的回调。

2. 代码实现

声明备份交换机、队列,类型为Fanout

@Configuration
public class MqBackupConfig {

    public static final String BACKUP_QUEUE = "backupQueue";

    public static final String BACKUP_EXCHANGE = "backupExchange";

    public static final String BACKUP_ROUTE_KEY = "backup.key";

    @Bean(BACKUP_QUEUE)
    public Queue backupQueue() {
        return QueueBuilder.durable(BACKUP_QUEUE).build();
    }

    @Bean(BACKUP_EXCHANGE)
    public FanoutExchange backupExchange() {
        return ExchangeBuilder.fanoutExchange(BACKUP_EXCHANGE).durable(true).build();
    }

    @Bean("backupBinding")
    public Binding backupBinding(@Qualifier(BACKUP_QUEUE) Queue backupQueue, @Qualifier(BACKUP_EXCHANGE) FanoutExchange backupExchange) {
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    }

}

声明业务交换机、队列,并绑定备份交换机:

@Configuration
public class MqBizConfig {
    public static final String BIZ_QUEUE = "bizQueue";

    public static final String BIZ_EXCHANGE = "bizExchange";

    public static final String BIZ_ROUTE_KEY = "biz.key";

    @Bean(BIZ_QUEUE)
    public Queue bizQueue() {
        return QueueBuilder.durable(BIZ_QUEUE).build();
    }

    @Bean(BIZ_EXCHANGE)
    public Exchange bizExchange() {
        // 使用alternate-exchange 设置备份交换机
        return ExchangeBuilder.directExchange(BIZ_EXCHANGE).withArgument("alternate-exchange", MqBackupConfig.BACKUP_EXCHANGE).durable(true).build();
    }

    @Bean("bizBinding")
    public Binding bizBinding(@Qualifier(BIZ_QUEUE) Queue bizQueue, @Qualifier(BIZ_EXCHANGE) Exchange bizExchange) {
        return BindingBuilder.bind(bizQueue).to(bizExchange).with(BIZ_ROUTE_KEY).noargs();
    }

}

设置备份队列消费者:

@RabbitListener(queues = {MqBackupConfig.BACKUP_QUEUE})
public void receiveMessage(Message message) {
    System.out.println("备份队列收到消息" + new String(message.getBody()));
    System.out.println("发送告警信息给管理员");
    System.out.println("存入数据库等待人工处理");
}

发送一条路由不正确的消息:

Message message = new Message("HELLO WORLD".getBytes(StandardCharsets.UTF_8));
rabbitTemplate.send(MqBizConfig.BIZ_EXCHANGE, "boot.key", message);

3. 测试

发送消息,未被投递的消息被转发到备份交换机:

image-20241022110839645