RabbitMQ之RabbitOperations详解
1. 前言
在上一篇,我们介绍了RabbitTemplate
实现AmqpTemplate
接口的所有方法,接下来学习下其实现的另外一个接口RabbitOperations
。AmqpTemplate
是对AMQP
协议的支持,完成了基本的发送、接收消息,而RabbitOperations
是对RabbitMQ
的直接集成,提供了更细致的操作。
2. send
send
方法,主要是添加了一个CorrelationData
参数。CorrelationData
用于发布确认、退回模式时进行数据封装,该对象会返回ACK
以及原因,开启了退回模式时,还会返回退回信息。
public class CorrelationData implements Correlation {
// 异步执行的结果,Confirm表示返回结果的类型
private final SettableListenableFuture<Confirm> future = new SettableListenableFuture();
// 唯一ID,如果未提供id将自动设置为唯一值。
private volatile String id;
// 退回时返回信息
private volatile ReturnedMessage returnedMessage;
}
// 发送消息,传递CorrelationData 对象
default void send(String routingKey, Message message, CorrelationData correlationData)
throws AmqpException {
throw new UnsupportedOperationException("This implementation does not support this method");
}
// 指定交换机、路由、传递CorrelationData 对象
void send(String exchange, String routingKey, Message message, CorrelationData correlationData)
throws AmqpException;
3. convertAndSend
convertAndSend
在send
方法的基础上,可以直接发送JAVA
对象,并可以添加一个MessagePostProcessor
消息处理器。
// 使用自定义路由KEY。发送消息到默认交换机,并携带CorrelationData
void convertAndSend(String routingKey, Object message, CorrelationData correlationData) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message, CorrelationData correlationData) throws AmqpException;
void convertAndSend(Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException;
void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException;
void correlationConvertAndSend(Object message, CorrelationData correlationData) throws AmqpException;
4. convertSendAndReceive
convertSendAndReceive
和AmqpTemplate
接口中发送并接收消息一样,是RPC
模式,区别是多了个CorrelationData
参数。
@Nullable
Object convertSendAndReceive(Object message, CorrelationData correlationData) throws AmqpException;
@Nullable
Object convertSendAndReceive(String routingKey, Object message, CorrelationData correlationData)
throws AmqpException;
@Nullable
Object convertSendAndReceive(String exchange, String routingKey, Object message,
CorrelationData correlationData) throws AmqpException;
@Nullable
Object convertSendAndReceive(Object message, MessagePostProcessor messagePostProcessor,
CorrelationData correlationData) throws AmqpException;
@Nullable
Object convertSendAndReceive(String routingKey, Object message,
MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException;
@Nullable
Object convertSendAndReceive(String exchange, String routingKey, Object message,
MessagePostProcessor messagePostProcessor, CorrelationData correlationData)
throws AmqpException;
@Nullable
<T> T convertSendAndReceiveAsType(Object message, CorrelationData correlationData,
ParameterizedTypeReference<T> responseType) throws AmqpException;
@Nullable
<T> T convertSendAndReceiveAsType(String routingKey, Object message, CorrelationData correlationData,
ParameterizedTypeReference<T> responseType) throws AmqpException;
@Nullable
default <T> T convertSendAndReceiveAsType(String exchange, String routingKey, Object message,
@Nullable CorrelationData correlationData, ParameterizedTypeReference<T> responseType)
throws AmqpException {
return convertSendAndReceiveAsType(exchange, routingKey, message, null, correlationData, responseType);
}
@Nullable
<T> T convertSendAndReceiveAsType(Object message, MessagePostProcessor messagePostProcessor,
CorrelationData correlationData, ParameterizedTypeReference<T> responseType) throws AmqpException;
@Nullable
<T> T convertSendAndReceiveAsType(String routingKey, Object message,
MessagePostProcessor messagePostProcessor, CorrelationData correlationData,
ParameterizedTypeReference<T> responseType) throws AmqpException;
@Nullable
<T> T convertSendAndReceiveAsType(String exchange, String routingKey, Object message,
@Nullable MessagePostProcessor messagePostProcessor,
@Nullable CorrelationData correlationData,
ParameterizedTypeReference<T> responseType) throws AmqpException;
5. execute
execute
方法可以获取原生Channel
执行操作,需要一个ChannelCallback
参数。
@Nullable
<T> T execute(ChannelCallback<T> action) throws AmqpException;
ChannelCallback
是一个函数式接口,使用该接口,可以获取RabbitMQ
的Channel
,执行任意操作,并返回结果。
@FunctionalInterface
public interface ChannelCallback<T> {
/**
* @param channel 通道
* @return 返回结果
*/
@Nullable
T doInRabbit(Channel var1) throws Exception;
}
示例:
ChannelCallback<Boolean> stringChannelCallback = new ChannelCallback<Boolean>() {
@Override
public Boolean doInRabbit(Channel channel) throws Exception {
// 调用Channel 发送消息
channel.basicPublish(MqBizConfig.BIZ_EXCHANGE,MqBizConfig.BIZ_ROUTE_KEY,null,"消息".getBytes());
System.out.println("doInRabbit");
return true;
}
};
Boolean execute = rabbitTemplate.execute(stringChannelCallback);
System.out.println("结果:"+execute);
6. invoke
invoke
方法需要一个OperationsCallback
参数,在该对象的doInRabbit()
方法中,任何操作都使用相同的专用通道,该通道将在结束时关闭(不会返回到缓存)。这种使用方式就叫做范围内操作。
@Nullable
default <T> T invoke(OperationsCallback<T> action) throws AmqpException {
ret
@Nullable
<T> T invoke(OperationsCallback<T> action, @Nullable com.rabbitmq.client.ConfirmCallback acks,
@Nullable com.rabbitmq.client.ConfirmCallback nacks);
}
OperationsCallback
操作回调,可以获取RabbitOperations
执行操作,并返回结果
@FunctionalInterface
interface OperationsCallback<T> {
/**
* @param operations RabbitOperations.
* @return 结果.
*/
@Nullable
T doInRabbit(RabbitOperations operations);
}
7. waitForConfirms
waitForConfirms
、waitForConfirmsOrDie
都是等待确认,但是必须在invoke
方法中使用,
// 等待确认
boolean waitForConfirms(long timeout) throws AmqpException;
// 等待确认,异常后信道被关闭,生产者发布不能继续发布消息
void waitForConfirmsOrDie(long timeout) throws AmqpException;
8. getConnectionFactory
返回此操作的连接工厂。
ConnectionFactory getConnectionFactory();
9. Other
start
、stop
没有实现的方法,只是为了向后兼容。
@Override
default void start() {
// No-op - implemented for backward compatibility
}
@Override
default void stop() {
// No-op - implemented for backward compatibility
}
@Override
default boolean isRunning() {
return false;
}