# 7. SpringBoot 2.x 整合 RabbitMQ

# 7.1 Hello World 模型

# 7.1.1 导入 maven 依赖坐标

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

# 7.1.2 配置 RabbitMQ

spring:
  application:
    name: rabbitmq-springboot
  rabbitmq:
    host: 172.16.208.140
    port: 5672
    username: hedon
    password: hedon
    virtual-host: /hedon

项目启动后,会自动注入 RabbitTemplate,该对象可以简化对 RabbitMQ 的使用。

# 7.1.3 生产者

关键对象:rabbitTemplate

@SpringBootTest
public class TestRabbitMQ {

    //注入 RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * ① Hello World 模型
     */
    @Test
    public void testHelloWorld(){
        /**
         * 参数1:队列名称
         * 参数2:消息内容
         *
         * 注意:当没有消费者在监听的时候,生产者的运行是不会有任何效果的。
         */
        rabbitTemplate.convertAndSend("hello","hello world");
    }
}

# 7.1.4 消费者

关键注解:

  • @RabbitListener:申明监听的队列
    • @Queue:具体指明哪一个队列及其相应的属性
  • @RabbitHandler:指定收到消息时的回调方法
@Component
//注明消费者在监听 hello 这个队列
@RabbitListener(queuesToDeclare = @Queue(value = "hello",durable = "false",autoDelete = "true"))
public class HelloConsumer {

    /**
     * RabbitHandler 注解指明这是接收到消息时的回调方法
     * message 参数是传过来的消息
     */
    @RabbitHandler
    public void receive(String message){
        System.out.println("message = " + message);
    }
}

# 7.1.5 测试

运行生产者所在的单元测试方法 testHelloWorld(),观察消费者是否接收到消息。

image-20200924160320272


# 7.2 Work Queues 模型

# 7.2.1 平均消费

默认就是平均消费的。

# 7.2.1.1 生产者

/**
 * ② Work Queues 模型
 */
@Test
public void testWorkQueues(){
  for (int i = 0; i < 10; i++) {
    rabbitTemplate.convertAndSend("work","work 模型发出的第 "+i+" 消息");
  }
}

# 7.2.1.2 消费者

@Component
public class WorkConsumer {

    /**
     * 如果直接在方法上加  @RabbitListener 注解的话,那就不需要加 @RabbitHandler 注解了,默认就加上了。
     */
    @RabbitListener(queuesToDeclare = @Queue(value = "work"))
    public void receive1(String message){
        System.out.println("这是 work 模型的消息者1得到的 message = " + message);
    }


    @RabbitListener(queuesToDeclare = @Queue(value = "work"))
    public void receive2(String message){
        System.out.println("这是 work 模型的消息者2得到的 message = " + message);
    }
}

# 7.2.1.3 测试

image-20200924161119662

# 7.2.2 能者多劳

待补。


# 7.3 Fanout 模型

# 7.3.1 生产者

/**
 * ③ Fanout 模型
 */
@Test
public void testFanout(){
    rabbitTemplate.convertAndSend("logs","","这是 Fanout 模型发送的消息");
}

# 7.3.2 消费者

@Component
public class FanoutConsumer {

    //因为是临时队列,所以不需要进行 queuesToDeclare
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,                                     //不指定名称的话就会给我们随机创建临时队列
                    exchange = @Exchange(value = "ex_logs",type = "fanout") //指定绑定的交换机
            )
    })
    public void receive1(String message){
        System.out.println("消费者1 message = " + message);
    }


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,                                     //不指定名称的话就会给我们随机创建临时队列
                    exchange = @Exchange(value = "ex_logs",type = "fanout") //指定绑定的交换机
            )
    })
    public void receive2(String message){
        System.out.println("消费者2 message = " + message);
    }

}

# 7.3.3 测试

image-20200924164335169

# 掉坑记录
这里笔者遇到了一个报错: ……………… received 'true' but current is 'false'。
# 报错原因
报错的原因其实是 queue 前后两次声明是不一样的,比如第一次声明的 queue 是要持久化的,第二次不想持久化了,这样就会报错。
# 解决方案
解决的办法就是去 web 管理界面删掉原来的 queue,然后再重新声明。

# 7.4 Routing 模型

# 7.4.1 生产者

/**
 * ④ Routing 模型
 */
@Test
public void testRouting() {
    rabbitTemplate.convertAndSend("routing_directs","info","这是 Routing 模型 发送 info 的 key 的信息");
    rabbitTemplate.convertAndSend("routing_directs","error","这是 Routing 模型 发送 error 的 key 的信息");
    rabbitTemplate.convertAndSend("routing_directs","warning","这是 Routing 模型 发送 warning 的 key 的信息");
}

# 7.4.2 消费者

@Component
public class RouteConsumer {

    @RabbitListener( bindings = {
            @QueueBinding(
                    value =  @Queue,
                    exchange = @Exchange(value = "routing_directs",type = "direct"), //默认就是 direct 类型的
                    key = {"info","error","warning"}  //接收 3 种类型的 routingKey 信息
            )
    })
    public void receive1(String message){
        System.out.println("消费者1 接受到的消息 message = "+ message);
    }

    @RabbitListener( bindings = {
            @QueueBinding(
                    value =  @Queue,
                    exchange = @Exchange(value = "routing_directs",type = "direct"),
                    key = {"info"}          //只接收 info 类型的 routingKey 信息
            )
    })
    public void receive2(String message){
        System.out.println("消费者2 接受到的消息 message = "+ message);
    }
}

# 7.4.3 测试

image-20200924165509444


# 7.5 Topics 模型

Topics 模型其实就是 Routing 模型将交换机的类型从 direct 改成 ==topic==,然后支持通配符。

# 7.5.1 生产者

/**
 * ⑤ Topics 模型
 */
@Test
public void testTopics(){
  rabbitTemplate.convertAndSend("routing_topics","info","这是 Topics 模型发送的 info 类型信息");
  rabbitTemplate.convertAndSend("routing_topics","info.a","这是 Topics 模型发送的 info.a 类型信息");
  rabbitTemplate.convertAndSend("routing_topics","info.a.b","这是 Topics 模型发送的 info.a.b 类型信息");
  rabbitTemplate.convertAndSend("routing_topics","info.a.b.c","这是 Topics 模型发送的 info.a.b.c 类型信息");
}

# 7.5.2 消费者

@Component
public class TopicsConsumer {


    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "routing_topics",type = "topic"),
                    key = {"info.#"}  // # 可以匹配 n 个单词
            )
    })
    public void receive1(String message){
        System.out.println("这是消费者1收到的消息 message = "+message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "routing_topics",type = "topic"),
                    key = {"info.*"}  // * 只能匹配 1 个单词
            )
    })
    public void receive2(String message){
        System.out.println("这是消费者2收到的消息 message = "+message);
    }

}

# 7.5.3 测试

image-20200924170806479


# 7.6 RPC 模型

待补。

# 7.7 Publisher Confirms 模型

待补。


上次更新: 8/29/2022, 12:17:25 AM