RabbitMQ 面试题
一、RabbitMQ 基础
1. RabbitMQ 是什么?
RabbitMQ 是一个开源的消息代理(Message Broker)软件,实现了高级消息队列协议(AMQP)。
核心概念:
- 生产者(Producer) - 发送消息的应用程序
- 消费者(Consumer) - 接收消息的应用程序
- 队列(Queue) - 存储消息的缓冲区
- 交换机(Exchange) - 路由消息到队列
- 绑定(Binding) - 连接交换机和队列
2. RabbitMQ vs Kafka vs RocketMQ
| 特性 | RabbitMQ | Kafka | RocketMQ |
|---|---|---|---|
| 协议 | AMQP | 自定义协议 | 自定义协议 |
| 消息顺序 | 单队列保证 | 分区内保证 | 单队列保证 |
| 吞吐量 | 中等(万级) | 高(十万级) | 高(十万级) |
| 延迟 | 低(毫秒级) | 高(秒级) | 低(毫秒级) |
| 持久化 | 支持 | 支持 | 支持 |
| 事务 | 支持 | 支持 | 支持 |
| 适用场景 | 消息队列、任务分发 | 日志收集、流处理 | 订单、支付等业务 |
3. RabbitMQ 的优势
✅ 可靠性 - 支持消息持久化、确认机制
✅ 灵活路由 - 多种交换机类型
✅ 集群支持 - 高可用和负载均衡
✅ 管理界面 - Web 管理控制台
✅ 多语言支持 - 丰富的客户端库
✅ 插件系统 - 可扩展性强
二、核心概念
1. 交换机类型(Exchange Types)
Direct Exchange(直连交换机):
- 根据路由键(routing key)精确匹配
- 路由键完全匹配才投递
csharp
// 声明交换机
channel.ExchangeDeclare("direct_exchange", ExchangeType.Direct);
// 绑定队列
channel.QueueBind("queue1", "direct_exchange", "error");
channel.QueueBind("queue2", "direct_exchange", "info");
// 发送消息
channel.BasicPublish("direct_exchange", "error", null, message);
// 只有 queue1 会收到消息Topic Exchange(主题交换机):
- 根据路由键模式匹配
- 支持通配符:
*(匹配一个单词)、#(匹配零个或多个单词)
csharp
// 声明交换机
channel.ExchangeDeclare("topic_exchange", ExchangeType.Topic);
// 绑定队列
channel.QueueBind("queue1", "topic_exchange", "*.error");
channel.QueueBind("queue2", "topic_exchange", "user.#");
// 发送消息
channel.BasicPublish("topic_exchange", "order.error", null, message);
// queue1 会收到消息(匹配 *.error)
channel.BasicPublish("topic_exchange", "user.create", null, message);
// queue2 会收到消息(匹配 user.#)Fanout Exchange(扇出交换机):
- 广播模式,忽略路由键
- 发送到所有绑定的队列
csharp
// 声明交换机
channel.ExchangeDeclare("fanout_exchange", ExchangeType.Fanout);
// 绑定队列
channel.QueueBind("queue1", "fanout_exchange", "");
channel.QueueBind("queue2", "fanout_exchange", "");
// 发送消息
channel.BasicPublish("fanout_exchange", "", null, message);
// queue1 和 queue2 都会收到消息Headers Exchange(头交换机):
- 根据消息头(headers)匹配
- 忽略路由键
csharp
// 声明交换机
channel.ExchangeDeclare("headers_exchange", ExchangeType.Headers);
// 绑定队列(匹配所有头部)
var headers = new Dictionary<string, object>
{
{ "type", "user" },
{ "action", "create" }
};
channel.QueueBind("queue1", "headers_exchange", "", headers);
// 发送消息
var props = channel.CreateBasicProperties();
props.Headers = headers;
channel.BasicPublish("headers_exchange", "", props, message);2. 消息持久化
队列持久化:
csharp
// 声明持久化队列
channel.QueueDeclare("durable_queue", durable: true, exclusive: false, autoDelete: false);消息持久化:
csharp
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 持久化消息
channel.BasicPublish("exchange", "routing_key", properties, message);交换机持久化:
csharp
channel.ExchangeDeclare("durable_exchange", ExchangeType.Direct, durable: true);注意:
- ⚠️ 只有队列、交换机和消息都持久化,消息才不会丢失
- ⚠️ 持久化会影响性能(写入磁盘)
3. 消息确认机制(Acknowledgment)
消费者确认:
csharp
// 关闭自动确认
channel.BasicConsume("queue", autoAck: false, consumer: consumer);
// 手动确认
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
// 拒绝消息(重新入队)
channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
// 拒绝消息(不重新入队)
channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);生产者确认(Publisher Confirms):
csharp
// 开启生产者确认
channel.ConfirmSelect();
// 异步确认
channel.BasicAcks += (sender, ea) =>
{
// 消息确认成功
Console.WriteLine($"Message {ea.DeliveryTag} confirmed");
};
channel.BasicNacks += (sender, ea) =>
{
// 消息确认失败
Console.WriteLine($"Message {ea.DeliveryTag} nacked");
};
// 发送消息
channel.BasicPublish("exchange", "routing_key", null, message);4. 死信队列(Dead Letter Queue)
死信队列(DLQ)应用场景:
- 消息被拒绝且不重新入队
- 消息 TTL 过期
- 队列达到最大长度
csharp
// 声明死信交换机
channel.ExchangeDeclare("dlx_exchange", ExchangeType.Direct);
// 声明死信队列
channel.QueueDeclare("dlq", durable: true, exclusive: false, autoDelete: false);
channel.QueueBind("dlq", "dlx_exchange", "dlq_routing_key");
// 声明正常队列,绑定死信交换机
var args = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "dlx_exchange" },
{ "x-dead-letter-routing-key", "dlq_routing_key" },
{ "x-message-ttl", 60000 } // 消息过期时间 60 秒
};
channel.QueueDeclare("normal_queue", durable: true, exclusive: false, autoDelete: false, arguments: args);三、高级特性
1. TTL(Time To Live)
消息 TTL:
csharp
var properties = channel.CreateBasicProperties();
properties.Expiration = "60000"; // 60 秒后过期
channel.BasicPublish("exchange", "routing_key", properties, message);队列 TTL:
csharp
var args = new Dictionary<string, object>
{
{ "x-message-ttl", 60000 } // 队列中所有消息的 TTL
};
channel.QueueDeclare("ttl_queue", durable: true, exclusive: false, autoDelete: false, arguments: args);2. 延迟队列(Delayed Queue)
实现方式:
- TTL + 死信队列
- RabbitMQ 延迟消息插件(rabbitmq-delayed-message-exchange)
csharp
// 使用延迟插件
var args = new Dictionary<string, object>
{
{ "x-delayed-type", "direct" }
};
channel.ExchangeDeclare("delayed_exchange", "x-delayed-message", durable: true, arguments: args);
var headers = new Dictionary<string, object>
{
{ "x-delay", 5000 } // 延迟 5 秒
};
var properties = channel.CreateBasicProperties();
properties.Headers = headers;
channel.BasicPublish("delayed_exchange", "routing_key", properties, message);3. 优先级队列
csharp
// 声明优先级队列
var args = new Dictionary<string, object>
{
{ "x-max-priority", 10 } // 最大优先级 10
};
channel.QueueDeclare("priority_queue", durable: true, exclusive: false, autoDelete: false, arguments: args);
// 发送优先级消息
var properties = channel.CreateBasicProperties();
properties.Priority = 5; // 优先级 5
channel.BasicPublish("exchange", "routing_key", properties, message);4. 消费者预取(Prefetch)
限制未确认消息数量:
csharp
// 设置预取数量
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
// prefetchCount: 最多接收 1 条未确认消息
channel.BasicConsume("queue", autoAck: false, consumer: consumer);作用:
- ✅ 公平分发(Fair Dispatch)- 防止某个消费者处理慢,导致消息堆积
- ✅ 负载均衡 - 消息会分发给空闲的消费者
四、集群和高可用
1. RabbitMQ 集群
集群类型:
- 普通集群 - 元数据同步,消息不冗余
- 镜像队列 - 消息在多个节点间复制(高可用)
集群配置:
bash
# 节点 1
rabbitmq-server -detached
# 节点 2
rabbitmq-server -detached
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
# 查看集群状态
rabbitmqctl cluster_status镜像队列:
csharp
// 声明镜像队列策略
var args = new Dictionary<string, object>
{
{ "ha-mode", "all" }, // 镜像到所有节点
{ "ha-sync-mode", "automatic" } // 自动同步
};
channel.ExchangeDeclare("ha_exchange", ExchangeType.Direct, durable: true);
channel.QueueDeclare("ha_queue", durable: true, exclusive: false, autoDelete: false, arguments: args);2. 高可用方案
方案一:镜像队列(推荐)
- 消息在多个节点复制
- 节点故障自动切换
- 配置简单
方案二:Federation/Shovel
- 跨地域消息传递
- 更灵活的消息路由
五、.NET Core 集成
1. 安装依赖
bash
dotnet add package RabbitMQ.Client2. 生产者示例
csharp
using RabbitMQ.Client;
using System.Text;
var factory = new ConnectionFactory
{
HostName = "localhost",
UserName = "guest",
Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// 声明交换机
channel.ExchangeDeclare("direct_exchange", ExchangeType.Direct, durable: true);
// 声明队列
channel.QueueDeclare("queue1", durable: true, exclusive: false, autoDelete: false);
// 绑定队列
channel.QueueBind("queue1", "direct_exchange", "routing_key");
// 发送消息
var message = "Hello RabbitMQ";
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish("direct_exchange", "routing_key", properties, body);
Console.WriteLine($"发送消息: {message}");3. 消费者示例
csharp
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
var factory = new ConnectionFactory
{
HostName = "localhost",
UserName = "guest",
Password = "guest"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// 声明队列(确保队列存在)
channel.QueueDeclare("queue1", durable: true, exclusive: false, autoDelete: false);
// 设置预取数量
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"接收消息: {message}");
// 模拟处理时间
Thread.Sleep(1000);
// 手动确认
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume("queue1", autoAck: false, consumer: consumer);
Console.WriteLine("按 [Enter] 退出");
Console.ReadLine();4. 使用 MassTransit(推荐)
bash
dotnet add package MassTransit
dotnet add package MassTransit.RabbitMQcsharp
using MassTransit;
var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host("localhost", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint("queue1", e =>
{
e.Consumer<MessageConsumer>();
});
});
await bus.StartAsync();
// 发布消息
var endpoint = await bus.GetSendEndpoint(new Uri("rabbitmq://localhost/queue1"));
await endpoint.Send(new Message { Content = "Hello" });
public class Message
{
public string Content { get; set; }
}
public class MessageConsumer : IConsumer<Message>
{
public async Task Consume(ConsumeContext<Message> context)
{
Console.WriteLine($"接收消息: {context.Message.Content}");
}
}六、常见面试题
Q1: 如何保证消息不丢失?
三个环节:
- 生产者 - 使用事务或 Publisher Confirms
- Broker - 消息持久化(队列、交换机、消息)
- 消费者 - 手动确认(autoAck: false)
Q2: 如何处理消息重复消费?
幂等性设计:
- ✅ 使用唯一 ID(如订单号)
- ✅ 数据库唯一约束
- ✅ 分布式锁(Redis)
- ✅ 状态机(已处理、处理中、未处理)
csharp
// 使用 Redis 实现幂等性
var key = $"processed:{messageId}";
if (await redis.StringSetAsync(key, "1", TimeSpan.FromMinutes(5), When.NotExists))
{
// 处理消息
ProcessMessage(message);
}
else
{
// 消息已处理,跳过
Console.WriteLine("消息已处理,跳过");
}Q3: 如何保证消息顺序?
单队列单消费者:
- 一个队列只绑定一个消费者
单队列多消费者(使用预取):
csharp
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
// 每次只接收 1 条消息,处理完再接收下一条多队列:
- 按业务维度分队列(如按用户 ID 分队列)
Q4: 如何处理消息堆积?
- 增加消费者 - 横向扩展
- 批量处理 - 一次处理多条消息
- 增加队列容量 - 设置队列最大长度
- 限流 - 使用预取机制
- 死信队列 - 处理失败的消息
- 监控告警 - 及时发现堆积问题
Q5: RabbitMQ 和 Kafka 的区别?
| 特性 | RabbitMQ | Kafka |
|---|---|---|
| 设计目标 | 消息队列 | 流处理平台 |
| 消息模型 | 点对点、发布订阅 | 发布订阅 |
| 消息顺序 | 单队列保证 | 分区内保证 |
| 吞吐量 | 中等(万级) | 高(十万级) |
| 延迟 | 低(毫秒级) | 高(秒级) |
| 消息保留 | 消费后删除 | 可配置保留时间 |
| 适用场景 | 任务分发、消息通知 | 日志收集、流处理 |
七、最佳实践
- ✅ 使用持久化 - 队列、交换机、消息都持久化
- ✅ 手动确认 - 避免消息丢失
- ✅ 设置预取 - 公平分发,负载均衡
- ✅ 使用死信队列 - 处理失败消息
- ✅ 监控队列长度 - 及时发现堆积
- ✅ 使用镜像队列 - 实现高可用
- ✅ 幂等性设计 - 防止重复消费
- ✅ 合理设置 TTL - 避免消息堆积
- ❌ 避免长事务 - 影响性能
- ❌ 不要忽略错误处理 - 消息处理失败要有重试机制