RabbitMQ之RabbitOperations详解

1. 前言

在上一篇,我们介绍了RabbitTemplate实现AmqpTemplate接口的所有方法,接下来学习下其实现的另外一个接口RabbitOperationsAmqpTemplate是对AMQP协议的支持,完成了基本的发送、接收消息,而RabbitOperations是对RabbitMQ的直接集成,提供了更细致的操作。

2. send

send 方法,主要是添加了一个CorrelationData参数。CorrelationData用于发布确认、退回模式时进行数据封装,该对象会返回ACK以及原因,开启了退回模式时,还会返回退回信息。

public class CorrelationData implements Correlation {
    // 异步执行的结果,Confirm表示返回结果的类型
    private final SettableListenableFuture<Confirm> future = new SettableListenableFuture();
    // 唯一ID,如果未提供id将自动设置为唯一值。
    private volatile String id;
    // 退回时返回信息
    private volatile ReturnedMessage returnedMessage;
}
// 发送消息,传递CorrelationData 对象
default void send(String routingKey, Message message, CorrelationData correlationData)
        throws AmqpException {

    throw new UnsupportedOperationException("This implementation does not support this method");
}

// 指定交换机、路由、传递CorrelationData 对象
void send(String exchange, String routingKey, Message message, CorrelationData correlationData)
        throws AmqpException;

3. convertAndSend

convertAndSendsend方法的基础上,可以直接发送JAVA对象,并可以添加一个MessagePostProcessor消息处理器。

// 使用自定义路由KEY。发送消息到默认交换机,并携带CorrelationData 
void convertAndSend(String routingKey, Object message, CorrelationData correlationData) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message, CorrelationData correlationData)  throws AmqpException;

void convertAndSend(Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException;

void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor,  CorrelationData correlationData) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor,  CorrelationData correlationData) throws AmqpException;

void correlationConvertAndSend(Object message, CorrelationData correlationData) throws AmqpException;

4. convertSendAndReceive

convertSendAndReceiveAmqpTemplate接口中发送并接收消息一样,是RPC模式,区别是多了个CorrelationData参数。

@Nullable
Object convertSendAndReceive(Object message, CorrelationData correlationData) throws AmqpException;

@Nullable
Object convertSendAndReceive(String routingKey, Object message, CorrelationData correlationData)
        throws AmqpException;

@Nullable
Object convertSendAndReceive(String exchange, String routingKey, Object message,
        CorrelationData correlationData) throws AmqpException;

@Nullable
Object convertSendAndReceive(Object message, MessagePostProcessor messagePostProcessor,
        CorrelationData correlationData) throws AmqpException;

@Nullable
Object convertSendAndReceive(String routingKey, Object message,
        MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException;

@Nullable
Object convertSendAndReceive(String exchange, String routingKey, Object message,
        MessagePostProcessor messagePostProcessor, CorrelationData correlationData)
        throws AmqpException;

@Nullable
<T> T convertSendAndReceiveAsType(Object message, CorrelationData correlationData,
        ParameterizedTypeReference<T> responseType) throws AmqpException;

@Nullable
<T> T convertSendAndReceiveAsType(String routingKey, Object message, CorrelationData correlationData,
        ParameterizedTypeReference<T> responseType) throws AmqpException;

@Nullable
default <T> T convertSendAndReceiveAsType(String exchange, String routingKey, Object message,
        @Nullable CorrelationData correlationData, ParameterizedTypeReference<T> responseType)
                throws AmqpException {

    return convertSendAndReceiveAsType(exchange, routingKey, message, null, correlationData, responseType);
}

@Nullable
<T> T convertSendAndReceiveAsType(Object message, MessagePostProcessor messagePostProcessor,
        CorrelationData correlationData, ParameterizedTypeReference<T> responseType) throws AmqpException;

@Nullable
<T> T convertSendAndReceiveAsType(String routingKey, Object message,
        MessagePostProcessor messagePostProcessor, CorrelationData correlationData,
        ParameterizedTypeReference<T> responseType) throws AmqpException;

@Nullable
<T> T convertSendAndReceiveAsType(String exchange, String routingKey, Object message,
        @Nullable MessagePostProcessor messagePostProcessor,
        @Nullable CorrelationData correlationData,
        ParameterizedTypeReference<T> responseType) throws AmqpException;

5. execute

execute方法可以获取原生Channel执行操作,需要一个ChannelCallback参数。

@Nullable
<T> T execute(ChannelCallback<T> action) throws AmqpException;

ChannelCallback是一个函数式接口,使用该接口,可以获取RabbitMQChannel,执行任意操作,并返回结果。

@FunctionalInterface
public interface ChannelCallback<T> {
    /**
     * @param channel 通道
     * @return 返回结果
     */
    @Nullable
    T doInRabbit(Channel var1) throws Exception;
}

示例:

ChannelCallback<Boolean> stringChannelCallback = new ChannelCallback<Boolean>() {
    @Override
    public Boolean doInRabbit(Channel channel) throws Exception {
        // 调用Channel 发送消息
        channel.basicPublish(MqBizConfig.BIZ_EXCHANGE,MqBizConfig.BIZ_ROUTE_KEY,null,"消息".getBytes());
        System.out.println("doInRabbit");
        return true;
    }
};
Boolean execute = rabbitTemplate.execute(stringChannelCallback);
System.out.println("结果:"+execute);

6. invoke

invoke方法需要一个OperationsCallback参数,在该对象的doInRabbit()方法中,任何操作都使用相同的专用通道,该通道将在结束时关闭(不会返回到缓存)。这种使用方式就叫做范围内操作。

@Nullable
default <T> T invoke(OperationsCallback<T> action) throws AmqpException {
    ret

@Nullable
<T> T invoke(OperationsCallback<T> action, @Nullable com.rabbitmq.client.ConfirmCallback acks,
        @Nullable com.rabbitmq.client.ConfirmCallback nacks);
}

OperationsCallback操作回调,可以获取RabbitOperations执行操作,并返回结果

@FunctionalInterface
interface OperationsCallback<T> {
    /**
     * @param operations RabbitOperations.
     * @return 结果.
     */
    @Nullable
    T doInRabbit(RabbitOperations operations);
}

7. waitForConfirms

waitForConfirmswaitForConfirmsOrDie都是等待确认,但是必须在invoke方法中使用,

// 等待确认 
boolean waitForConfirms(long timeout) throws AmqpException;

// 等待确认,异常后信道被关闭,生产者发布不能继续发布消息
void waitForConfirmsOrDie(long timeout) throws AmqpException;

8. getConnectionFactory

返回此操作的连接工厂。

ConnectionFactory getConnectionFactory();

9. Other

startstop没有实现的方法,只是为了向后兼容。

@Override
default void start() {
    // No-op - implemented for backward compatibility
}

@Override
default void stop() {
    // No-op - implemented for backward compatibility
}

@Override
default boolean isRunning() {
    return false;
}