Spring-AMQP 特性
- Listener container 使用 POJO 异步处理消息
- 提供一个高度抽象的 template(RabbitTemplate)发送和接收消息
- 使用 RabbitAdmin 来自动声明队列、Exchange 和 Binding
队列、Exchange 和 Binding 的设置
Spring way
使用spring-boot-starter-amqp
会自动配置ConnectionFactory
、RabbitTemplate
和AmqpAdmin
三个 Bean。@Configuration public class RabbitConfiguration {
@Bean
public DirectExchange exchange() {
return new DirectExchange("infrastructure.direct");
}
@Bean
public Queue queue(){
return new Queue("queueName");
}
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(exchange()).with("routeKey");
}
}Just java
ConnectionFactory connectionFactory = new CachingConnectionFactory(); RabbitAdmin amqpAdmin = new RabbitAdmin(connectionFactory);
Queue queue = new Queue("quueuName");
DirectExchange exchange = new DirectExchange("exchangeName");
amqpAdmin.declareQueue(queue);
amqpAdmin.declareExchange(exchange);
amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("routeKey"));
生产消息
@Autowired
private RabbitTemplate rabbitTemplate;
// 三种方式发送消息
rabbitTemplate.send();
rabbitTemplate.sendAndReceive(); // 用于 RPC 模式的消息发送并等待响应
rabbitTemplate.convertAndSend(); // 将 Java 对象转换成 Message 并发送
消费消息
在 Listener container 中设置一个简单的 POJO 对象来异步处理消息
消息 Listener 实现
// 三种方式 public class MessageHandler implements MessageListener {
@Override
public void onMessage(Message message) {
}
}
public class MessageHandler implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
}
}
public class MessageHandler {
public void handleMessage(Message message);
}设置 MessageListenerContainer
@Autowired private ConnectionFactory connectionFactory;
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames("queueName");
container.setMessageListener(new MessageListenerAdapter(new MessageHandler()));
container.setXXX(); 设置队列、listener 等
container.start();
container.addQueueNames(); // 动态添加队列
处理消息异常
默认情况下,listener 抛出的所有异常都将包装成 ListenerExecutionFailedException
异常,然后将消息重新放入队列。
因为 MessageListenerContainer
默认设置 ErrorHandler
为 ConditionalRejectingErrorHandler
,ConditionalRejectingErrorHandler
的默认策略是 DefaultExceptionStrategy
,当 DefaultExceptionStrategy
的 isFatal
返回 true 时,该消息将被忽略。
默认发生 MessageConversionException
异常的消息将被丢弃。我们也可以根据情况自己实现 ErrorHandler
来处理消息异常。