# 7. SpringBoot 2.x 整合 RabbitMQ
# 7.1 Hello World 模型
# 7.1.1 导入 maven 依赖坐标
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
# 7.1.2 配置 RabbitMQ
spring:
application:
name: rabbitmq-springboot
rabbitmq:
host: 172.16.208.140
port: 5672
username: hedon
password: hedon
virtual-host: /hedon
项目启动后,会自动注入 RabbitTemplate
,该对象可以简化对 RabbitMQ 的使用。
# 7.1.3 生产者
关键对象:rabbitTemplate
@SpringBootTest
public class TestRabbitMQ {
//注入 RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* ① Hello World 模型
*/
@Test
public void testHelloWorld(){
/**
* 参数1:队列名称
* 参数2:消息内容
*
* 注意:当没有消费者在监听的时候,生产者的运行是不会有任何效果的。
*/
rabbitTemplate.convertAndSend("hello","hello world");
}
}
# 7.1.4 消费者
关键注解:
- @RabbitListener:申明监听的队列
- @Queue:具体指明哪一个队列及其相应的属性
- @RabbitHandler:指定收到消息时的回调方法
@Component
//注明消费者在监听 hello 这个队列
@RabbitListener(queuesToDeclare = @Queue(value = "hello",durable = "false",autoDelete = "true"))
public class HelloConsumer {
/**
* RabbitHandler 注解指明这是接收到消息时的回调方法
* message 参数是传过来的消息
*/
@RabbitHandler
public void receive(String message){
System.out.println("message = " + message);
}
}
# 7.1.5 测试
运行生产者所在的单元测试方法 testHelloWorld(),观察消费者是否接收到消息。
# 7.2 Work Queues 模型
# 7.2.1 平均消费
默认就是平均消费的。
# 7.2.1.1 生产者
/**
* ② Work Queues 模型
*/
@Test
public void testWorkQueues(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work","work 模型发出的第 "+i+" 消息");
}
}
# 7.2.1.2 消费者
@Component
public class WorkConsumer {
/**
* 如果直接在方法上加 @RabbitListener 注解的话,那就不需要加 @RabbitHandler 注解了,默认就加上了。
*/
@RabbitListener(queuesToDeclare = @Queue(value = "work"))
public void receive1(String message){
System.out.println("这是 work 模型的消息者1得到的 message = " + message);
}
@RabbitListener(queuesToDeclare = @Queue(value = "work"))
public void receive2(String message){
System.out.println("这是 work 模型的消息者2得到的 message = " + message);
}
}
# 7.2.1.3 测试
# 7.2.2 能者多劳
待补。
# 7.3 Fanout 模型
# 7.3.1 生产者
/**
* ③ Fanout 模型
*/
@Test
public void testFanout(){
rabbitTemplate.convertAndSend("logs","","这是 Fanout 模型发送的消息");
}
# 7.3.2 消费者
@Component
public class FanoutConsumer {
//因为是临时队列,所以不需要进行 queuesToDeclare
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //不指定名称的话就会给我们随机创建临时队列
exchange = @Exchange(value = "ex_logs",type = "fanout") //指定绑定的交换机
)
})
public void receive1(String message){
System.out.println("消费者1 message = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, //不指定名称的话就会给我们随机创建临时队列
exchange = @Exchange(value = "ex_logs",type = "fanout") //指定绑定的交换机
)
})
public void receive2(String message){
System.out.println("消费者2 message = " + message);
}
}
# 7.3.3 测试
# 掉坑记录
这里笔者遇到了一个报错: ……………… received 'true' but current is 'false'。
# 报错原因
报错的原因其实是 queue 前后两次声明是不一样的,比如第一次声明的 queue 是要持久化的,第二次不想持久化了,这样就会报错。
# 解决方案
解决的办法就是去 web 管理界面删掉原来的 queue,然后再重新声明。
# 7.4 Routing 模型
# 7.4.1 生产者
/**
* ④ Routing 模型
*/
@Test
public void testRouting() {
rabbitTemplate.convertAndSend("routing_directs","info","这是 Routing 模型 发送 info 的 key 的信息");
rabbitTemplate.convertAndSend("routing_directs","error","这是 Routing 模型 发送 error 的 key 的信息");
rabbitTemplate.convertAndSend("routing_directs","warning","这是 Routing 模型 发送 warning 的 key 的信息");
}
# 7.4.2 消费者
@Component
public class RouteConsumer {
@RabbitListener( bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "routing_directs",type = "direct"), //默认就是 direct 类型的
key = {"info","error","warning"} //接收 3 种类型的 routingKey 信息
)
})
public void receive1(String message){
System.out.println("消费者1 接受到的消息 message = "+ message);
}
@RabbitListener( bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "routing_directs",type = "direct"),
key = {"info"} //只接收 info 类型的 routingKey 信息
)
})
public void receive2(String message){
System.out.println("消费者2 接受到的消息 message = "+ message);
}
}
# 7.4.3 测试
# 7.5 Topics 模型
Topics 模型其实就是 Routing 模型将交换机的类型从 direct 改成 ==topic==,然后支持通配符。
# 7.5.1 生产者
/**
* ⑤ Topics 模型
*/
@Test
public void testTopics(){
rabbitTemplate.convertAndSend("routing_topics","info","这是 Topics 模型发送的 info 类型信息");
rabbitTemplate.convertAndSend("routing_topics","info.a","这是 Topics 模型发送的 info.a 类型信息");
rabbitTemplate.convertAndSend("routing_topics","info.a.b","这是 Topics 模型发送的 info.a.b 类型信息");
rabbitTemplate.convertAndSend("routing_topics","info.a.b.c","这是 Topics 模型发送的 info.a.b.c 类型信息");
}
# 7.5.2 消费者
@Component
public class TopicsConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "routing_topics",type = "topic"),
key = {"info.#"} // # 可以匹配 n 个单词
)
})
public void receive1(String message){
System.out.println("这是消费者1收到的消息 message = "+message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "routing_topics",type = "topic"),
key = {"info.*"} // * 只能匹配 1 个单词
)
})
public void receive2(String message){
System.out.println("这是消费者2收到的消息 message = "+message);
}
}
# 7.5.3 测试
# 7.6 RPC 模型
待补。
# 7.7 Publisher Confirms 模型
待补。