大家好,我是工藤学编程 🦉 | 一个正在努力学习的小博主,期待你的关注 |
|---|---|
实战代码系列最新文章😉 | C++实现图书管理系统(Qt C++ GUI界面版) |
SpringBoot实战系列🐷 | 【SpringBoot实战系列】SpringBoot3.X 整合 MinIO 存储原生方案 |
分库分表 | 分库分表之实战-sharding-JDBC分库分表执行流程原理剖析 |
消息队列 | 深入浅出 RabbitMQ - 主题模式(Topic) |
前情摘要:
1、深入浅出 RabbitMQ-核心概念介绍与容器化部署 2、深入浅出 RabbitMQ-简单队列实战 3、深入浅出 RabbitMQ-工作队列实战(轮训策略VS公平策略) 4、深入浅出 RabbitMQ-交换机详解与发布订阅模型实战 4、深入浅出 RabbitMQ-路由模式详解 5、深入浅出 RabbitMQ - 主题模式(Topic)
安装RabbitMQ 首先确保已安装RabbitMQ服务并启动,可通过官方镜像或包管理工具安装。可参见深入浅出 RabbitMQ-核心概念介绍与容器化部署。
项目依赖
在SpringBoot项目的pom.xml中添加RabbitMQ Starter依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>SpringBoot通过yml配置文件自动注入RabbitMQ连接信息,无需手动创建连接工厂。
spring:
rabbitmq:
host: 192.168.229.128 # RabbitMQ服务器地址
port: 5672 # 通信端口(默认5672)
virtual-host: /dev # 虚拟主机(隔离不同环境的消息资源)
username: admin # 登录用户名
password: password # 登录密码配置说明:
virtual-host用于隔离不同项目的消息,避免命名冲突;port(默认5672)和virtual-host(默认/)。通过配置类定义交换机(Exchange)、队列(Queue)及绑定关系(Binding),这是RabbitMQ消息路由的核心。
@Configuration
public class RabbitMQConfig {
// 交换机名称
public static final String EXCHANGE_NAME = "order_exchange";
// 队列名称
public static final String QUEUE_NAME = "order_queue";
/**
* 声明主题交换机(Topic Exchange)
* 特点:按路由键模式匹配消息,支持通配符(*匹配一个词,#匹配多个词)
*/
@Bean
public Exchange orderExchange() {
// durable(true):交换机持久化(服务重启后不丢失)
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
/**
* 声明持久化队列
*/
@Bean
public Queue orderQueue() {
// durable(true):队列持久化(服务重启后队列仍存在)
return QueueBuilder.durable(QUEUE_NAME).build();
}
/**
* 绑定交换机与队列
* 路由键规则:order.#(匹配以order.开头的所有路由键,如order.new、order.pay等)
*/
@Bean
public Binding orderBinding(Queue queue, Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
}
}核心概念解析:
使用RabbitTemplate发送消息到指定交换机,SpringBoot已自动配置该组件,可直接注入使用。
@SpringBootTest
class DemoApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息测试
*/
@Test
void sendMessage() {
// 参数1:交换机名称(从配置类引用,避免硬编码)
// 参数2:路由键(需符合绑定规则order.#)
// 参数3:消息内容(支持String、对象等类型)
rabbitTemplate.convertAndSend(
RabbitMQConfig.EXCHANGE_NAME,
"order.new",
"新订单来啦1"
);
System.out.println("消息发送成功");
}
}发送逻辑说明:
order.new符合绑定规则order.#,消息会被路由到order_queue;user.new),消息将被丢弃(除非配置了死信交换机);Serializable接口。通过@RabbitListener注解监听指定队列,自动接收并处理消息。
@Component
public class OrderMQListener {
/**
* 监听order_queue队列的消息
* @param msg 消息内容(自动转换为String类型)
* @param message 消息完整对象(包含属性、路由键等元数据)
*/
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME) // 绑定监听的队列
@RabbitHandler // 处理消息的具体方法
public void handleOrderMessage(String msg, Message message) {
// 获取消息投递标签(用于手动确认消息)
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 打印消息详情
System.out.println("===== 接收到消息 =====");
System.out.println("消息标签:" + deliveryTag);
System.out.println("消息内容:" + msg);
System.out.println("消息元数据:" + message.getMessageProperties());
}
}消费逻辑说明:
@RabbitListener:指定监听的队列,支持同时监听多个队列;@RabbitHandler:标记消息处理方法,支持根据消息类型重载;acknowledge-mode: manual。sendMessage方法,发送测试消息;
durable(true),避免服务重启后组件丢失;*和#的通配规则,否则消息无法路由;Jackson2JsonMessageConverter);本文通过实战案例演示了SpringBoot2.X整合RabbitMQ的核心步骤,从配置连接、定义组件到实现消息的生产与消费。关键在于理解交换机、队列、路由键的协作关系,以及SpringBoot自动配置带来的简化。后续可进一步学习消息确认机制、死信队列、延迟消息等高级特性,提升消息队列的可靠性。
觉得有用请点赞收藏! 如果有相关问题,欢迎评论区留言讨论~