Skip to content

RabbitMQ 面试题

一、RabbitMQ 基础

1. RabbitMQ 是什么?

RabbitMQ 是一个开源的消息代理(Message Broker)软件,实现了高级消息队列协议(AMQP)。

核心概念:

  • 生产者(Producer) - 发送消息的应用程序
  • 消费者(Consumer) - 接收消息的应用程序
  • 队列(Queue) - 存储消息的缓冲区
  • 交换机(Exchange) - 路由消息到队列
  • 绑定(Binding) - 连接交换机和队列

2. RabbitMQ vs Kafka vs RocketMQ

特性RabbitMQKafkaRocketMQ
协议AMQP自定义协议自定义协议
消息顺序单队列保证分区内保证单队列保证
吞吐量中等(万级)高(十万级)高(十万级)
延迟低(毫秒级)高(秒级)低(毫秒级)
持久化支持支持支持
事务支持支持支持
适用场景消息队列、任务分发日志收集、流处理订单、支付等业务

3. RabbitMQ 的优势

可靠性 - 支持消息持久化、确认机制
灵活路由 - 多种交换机类型
集群支持 - 高可用和负载均衡
管理界面 - Web 管理控制台
多语言支持 - 丰富的客户端库
插件系统 - 可扩展性强

二、核心概念

1. 交换机类型(Exchange Types)

Direct Exchange(直连交换机):

  • 根据路由键(routing key)精确匹配
  • 路由键完全匹配才投递
csharp
// 声明交换机
channel.ExchangeDeclare("direct_exchange", ExchangeType.Direct);

// 绑定队列
channel.QueueBind("queue1", "direct_exchange", "error");
channel.QueueBind("queue2", "direct_exchange", "info");

// 发送消息
channel.BasicPublish("direct_exchange", "error", null, message);
// 只有 queue1 会收到消息

Topic Exchange(主题交换机):

  • 根据路由键模式匹配
  • 支持通配符:*(匹配一个单词)、#(匹配零个或多个单词)
csharp
// 声明交换机
channel.ExchangeDeclare("topic_exchange", ExchangeType.Topic);

// 绑定队列
channel.QueueBind("queue1", "topic_exchange", "*.error");
channel.QueueBind("queue2", "topic_exchange", "user.#");

// 发送消息
channel.BasicPublish("topic_exchange", "order.error", null, message);
// queue1 会收到消息(匹配 *.error)

channel.BasicPublish("topic_exchange", "user.create", null, message);
// queue2 会收到消息(匹配 user.#)

Fanout Exchange(扇出交换机):

  • 广播模式,忽略路由键
  • 发送到所有绑定的队列
csharp
// 声明交换机
channel.ExchangeDeclare("fanout_exchange", ExchangeType.Fanout);

// 绑定队列
channel.QueueBind("queue1", "fanout_exchange", "");
channel.QueueBind("queue2", "fanout_exchange", "");

// 发送消息
channel.BasicPublish("fanout_exchange", "", null, message);
// queue1 和 queue2 都会收到消息

Headers Exchange(头交换机):

  • 根据消息头(headers)匹配
  • 忽略路由键
csharp
// 声明交换机
channel.ExchangeDeclare("headers_exchange", ExchangeType.Headers);

// 绑定队列(匹配所有头部)
var headers = new Dictionary<string, object>
{
    { "type", "user" },
    { "action", "create" }
};
channel.QueueBind("queue1", "headers_exchange", "", headers);

// 发送消息
var props = channel.CreateBasicProperties();
props.Headers = headers;
channel.BasicPublish("headers_exchange", "", props, message);

2. 消息持久化

队列持久化:

csharp
// 声明持久化队列
channel.QueueDeclare("durable_queue", durable: true, exclusive: false, autoDelete: false);

消息持久化:

csharp
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 持久化消息

channel.BasicPublish("exchange", "routing_key", properties, message);

交换机持久化:

csharp
channel.ExchangeDeclare("durable_exchange", ExchangeType.Direct, durable: true);

注意:

  • ⚠️ 只有队列、交换机和消息都持久化,消息才不会丢失
  • ⚠️ 持久化会影响性能(写入磁盘)

3. 消息确认机制(Acknowledgment)

消费者确认:

csharp
// 关闭自动确认
channel.BasicConsume("queue", autoAck: false, consumer: consumer);

// 手动确认
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

// 拒绝消息(重新入队)
channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);

// 拒绝消息(不重新入队)
channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);

生产者确认(Publisher Confirms):

csharp
// 开启生产者确认
channel.ConfirmSelect();

// 异步确认
channel.BasicAcks += (sender, ea) =>
{
    // 消息确认成功
    Console.WriteLine($"Message {ea.DeliveryTag} confirmed");
};

channel.BasicNacks += (sender, ea) =>
{
    // 消息确认失败
    Console.WriteLine($"Message {ea.DeliveryTag} nacked");
};

// 发送消息
channel.BasicPublish("exchange", "routing_key", null, message);

4. 死信队列(Dead Letter Queue)

死信队列(DLQ)应用场景:

  • 消息被拒绝且不重新入队
  • 消息 TTL 过期
  • 队列达到最大长度
csharp
// 声明死信交换机
channel.ExchangeDeclare("dlx_exchange", ExchangeType.Direct);

// 声明死信队列
channel.QueueDeclare("dlq", durable: true, exclusive: false, autoDelete: false);
channel.QueueBind("dlq", "dlx_exchange", "dlq_routing_key");

// 声明正常队列,绑定死信交换机
var args = new Dictionary<string, object>
{
    { "x-dead-letter-exchange", "dlx_exchange" },
    { "x-dead-letter-routing-key", "dlq_routing_key" },
    { "x-message-ttl", 60000 } // 消息过期时间 60 秒
};
channel.QueueDeclare("normal_queue", durable: true, exclusive: false, autoDelete: false, arguments: args);

三、高级特性

1. TTL(Time To Live)

消息 TTL:

csharp
var properties = channel.CreateBasicProperties();
properties.Expiration = "60000"; // 60 秒后过期

channel.BasicPublish("exchange", "routing_key", properties, message);

队列 TTL:

csharp
var args = new Dictionary<string, object>
{
    { "x-message-ttl", 60000 } // 队列中所有消息的 TTL
};
channel.QueueDeclare("ttl_queue", durable: true, exclusive: false, autoDelete: false, arguments: args);

2. 延迟队列(Delayed Queue)

实现方式:

  1. TTL + 死信队列
  2. RabbitMQ 延迟消息插件(rabbitmq-delayed-message-exchange)
csharp
// 使用延迟插件
var args = new Dictionary<string, object>
{
    { "x-delayed-type", "direct" }
};
channel.ExchangeDeclare("delayed_exchange", "x-delayed-message", durable: true, arguments: args);

var headers = new Dictionary<string, object>
{
    { "x-delay", 5000 } // 延迟 5 秒
};
var properties = channel.CreateBasicProperties();
properties.Headers = headers;

channel.BasicPublish("delayed_exchange", "routing_key", properties, message);

3. 优先级队列

csharp
// 声明优先级队列
var args = new Dictionary<string, object>
{
    { "x-max-priority", 10 } // 最大优先级 10
};
channel.QueueDeclare("priority_queue", durable: true, exclusive: false, autoDelete: false, arguments: args);

// 发送优先级消息
var properties = channel.CreateBasicProperties();
properties.Priority = 5; // 优先级 5

channel.BasicPublish("exchange", "routing_key", properties, message);

4. 消费者预取(Prefetch)

限制未确认消息数量:

csharp
// 设置预取数量
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
// prefetchCount: 最多接收 1 条未确认消息

channel.BasicConsume("queue", autoAck: false, consumer: consumer);

作用:

  • ✅ 公平分发(Fair Dispatch)- 防止某个消费者处理慢,导致消息堆积
  • ✅ 负载均衡 - 消息会分发给空闲的消费者

四、集群和高可用

1. RabbitMQ 集群

集群类型:

  • 普通集群 - 元数据同步,消息不冗余
  • 镜像队列 - 消息在多个节点间复制(高可用)

集群配置:

bash
# 节点 1
rabbitmq-server -detached

# 节点 2
rabbitmq-server -detached
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app

# 查看集群状态
rabbitmqctl cluster_status

镜像队列:

csharp
// 声明镜像队列策略
var args = new Dictionary<string, object>
{
    { "ha-mode", "all" }, // 镜像到所有节点
    { "ha-sync-mode", "automatic" } // 自动同步
};

channel.ExchangeDeclare("ha_exchange", ExchangeType.Direct, durable: true);
channel.QueueDeclare("ha_queue", durable: true, exclusive: false, autoDelete: false, arguments: args);

2. 高可用方案

方案一:镜像队列(推荐)

  • 消息在多个节点复制
  • 节点故障自动切换
  • 配置简单

方案二:Federation/Shovel

  • 跨地域消息传递
  • 更灵活的消息路由

五、.NET Core 集成

1. 安装依赖

bash
dotnet add package RabbitMQ.Client

2. 生产者示例

csharp
using RabbitMQ.Client;
using System.Text;

var factory = new ConnectionFactory
{
    HostName = "localhost",
    UserName = "guest",
    Password = "guest"
};

using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

// 声明交换机
channel.ExchangeDeclare("direct_exchange", ExchangeType.Direct, durable: true);

// 声明队列
channel.QueueDeclare("queue1", durable: true, exclusive: false, autoDelete: false);

// 绑定队列
channel.QueueBind("queue1", "direct_exchange", "routing_key");

// 发送消息
var message = "Hello RabbitMQ";
var body = Encoding.UTF8.GetBytes(message);

var properties = channel.CreateBasicProperties();
properties.Persistent = true;

channel.BasicPublish("direct_exchange", "routing_key", properties, body);
Console.WriteLine($"发送消息: {message}");

3. 消费者示例

csharp
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

var factory = new ConnectionFactory
{
    HostName = "localhost",
    UserName = "guest",
    Password = "guest"
};

using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

// 声明队列(确保队列存在)
channel.QueueDeclare("queue1", durable: true, exclusive: false, autoDelete: false);

// 设置预取数量
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    
    Console.WriteLine($"接收消息: {message}");
    
    // 模拟处理时间
    Thread.Sleep(1000);
    
    // 手动确认
    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};

channel.BasicConsume("queue1", autoAck: false, consumer: consumer);

Console.WriteLine("按 [Enter] 退出");
Console.ReadLine();

4. 使用 MassTransit(推荐)

bash
dotnet add package MassTransit
dotnet add package MassTransit.RabbitMQ
csharp
using MassTransit;

var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.Host("localhost", h =>
    {
        h.Username("guest");
        h.Password("guest");
    });
    
    cfg.ReceiveEndpoint("queue1", e =>
    {
        e.Consumer<MessageConsumer>();
    });
});

await bus.StartAsync();

// 发布消息
var endpoint = await bus.GetSendEndpoint(new Uri("rabbitmq://localhost/queue1"));
await endpoint.Send(new Message { Content = "Hello" });

public class Message
{
    public string Content { get; set; }
}

public class MessageConsumer : IConsumer<Message>
{
    public async Task Consume(ConsumeContext<Message> context)
    {
        Console.WriteLine($"接收消息: {context.Message.Content}");
    }
}

六、常见面试题

Q1: 如何保证消息不丢失?

三个环节:

  1. 生产者 - 使用事务或 Publisher Confirms
  2. Broker - 消息持久化(队列、交换机、消息)
  3. 消费者 - 手动确认(autoAck: false)

Q2: 如何处理消息重复消费?

幂等性设计:

  • ✅ 使用唯一 ID(如订单号)
  • ✅ 数据库唯一约束
  • ✅ 分布式锁(Redis)
  • ✅ 状态机(已处理、处理中、未处理)
csharp
// 使用 Redis 实现幂等性
var key = $"processed:{messageId}";
if (await redis.StringSetAsync(key, "1", TimeSpan.FromMinutes(5), When.NotExists))
{
    // 处理消息
    ProcessMessage(message);
}
else
{
    // 消息已处理,跳过
    Console.WriteLine("消息已处理,跳过");
}

Q3: 如何保证消息顺序?

单队列单消费者:

  • 一个队列只绑定一个消费者

单队列多消费者(使用预取):

csharp
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
// 每次只接收 1 条消息,处理完再接收下一条

多队列:

  • 按业务维度分队列(如按用户 ID 分队列)

Q4: 如何处理消息堆积?

  1. 增加消费者 - 横向扩展
  2. 批量处理 - 一次处理多条消息
  3. 增加队列容量 - 设置队列最大长度
  4. 限流 - 使用预取机制
  5. 死信队列 - 处理失败的消息
  6. 监控告警 - 及时发现堆积问题

Q5: RabbitMQ 和 Kafka 的区别?

特性RabbitMQKafka
设计目标消息队列流处理平台
消息模型点对点、发布订阅发布订阅
消息顺序单队列保证分区内保证
吞吐量中等(万级)高(十万级)
延迟低(毫秒级)高(秒级)
消息保留消费后删除可配置保留时间
适用场景任务分发、消息通知日志收集、流处理

七、最佳实践

  1. 使用持久化 - 队列、交换机、消息都持久化
  2. 手动确认 - 避免消息丢失
  3. 设置预取 - 公平分发,负载均衡
  4. 使用死信队列 - 处理失败消息
  5. 监控队列长度 - 及时发现堆积
  6. 使用镜像队列 - 实现高可用
  7. 幂等性设计 - 防止重复消费
  8. 合理设置 TTL - 避免消息堆积
  9. 避免长事务 - 影响性能
  10. 不要忽略错误处理 - 消息处理失败要有重试机制

基于 VitePress 构建 | Copyright © 2026-present