Spring-AMQP 的使用记录

2016/3/21 posted in  Spring Boot comments

Spring-AMQP 特性

  • Listener container 使用 POJO 异步处理消息
  • 提供一个高度抽象的 template(RabbitTemplate)发送和接收消息
  • 使用 RabbitAdmin 来自动声明队列、Exchange 和 Binding

队列、Exchange 和 Binding 的设置

  • Spring way
    使用 spring-boot-starter-amqp 会自动配置 ConnectionFactoryRabbitTemplateAmqpAdmin 三个 Bean。

    @Configuration
    public class RabbitConfiguration {
    @Bean
    public DirectExchange exchange() {
    return new DirectExchange("infrastructure.direct");
    }
    @Bean
    public Queue queue(){
    return new Queue("queueName");
    }
    @Bean
    public Binding binding(){
    return BindingBuilder.bind(queue()).to(exchange()).with("routeKey");
    }
    }
  • Just java

    ConnectionFactory connectionFactory = new CachingConnectionFactory();
    RabbitAdmin amqpAdmin = new RabbitAdmin(connectionFactory);
    Queue queue = new Queue("quueuName");
    DirectExchange exchange = new DirectExchange("exchangeName");
    amqpAdmin.declareQueue(queue);
    amqpAdmin.declareExchange(exchange);
    amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("routeKey"));

生产消息

@Autowired
private RabbitTemplate rabbitTemplate;

// 三种方式发送消息
rabbitTemplate.send();
rabbitTemplate.sendAndReceive(); // 用于 RPC 模式的消息发送并等待响应
rabbitTemplate.convertAndSend(); // 将 Java 对象转换成 Message 并发送

消费消息

在 Listener container 中设置一个简单的 POJO 对象来异步处理消息

  • 消息 Listener 实现

    // 三种方式
    public class MessageHandler implements MessageListener {
    @Override
    public void onMessage(Message message) {
    }
    }
    public class MessageHandler implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
    }
    }
    public class MessageHandler {
    public void handleMessage(Message message);
    }
  • 设置 MessageListenerContainer

    @Autowired
    private ConnectionFactory connectionFactory;
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("queueName");
    container.setMessageListener(new MessageListenerAdapter(new MessageHandler()));
    container.setXXX(); 设置队列、listener 等
    container.start();
    container.addQueueNames(); // 动态添加队列

处理消息异常

默认情况下,listener 抛出的所有异常都将包装成 ListenerExecutionFailedException 异常,然后将消息重新放入队列。
因为 MessageListenerContainer 默认设置 ErrorHandlerConditionalRejectingErrorHandlerConditionalRejectingErrorHandler 的默认策略是 DefaultExceptionStrategy,当 DefaultExceptionStrategyisFatal 返回 true 时,该消息将被忽略。
默认发生 MessageConversionException 异常的消息将被丢弃。我们也可以根据情况自己实现 ErrorHandler 来处理消息异常。