一、基础概念
1. Redis 是什么?有什么特点?
- 定义:Redis(Remote Dictionary Server)是一个开源的、基于内存的 键值存储系统,支持多种数据结构,常用于缓存、消息队列、会话存储等场景。
- 核心特点:
- ✅ 高性能:数据在内存中操作,读写速度极快(10万+/秒)。
- ✅ 丰富数据结构:String, Hash, List, Set, Sorted Set, Bitmap, HyperLogLog, Geo, Stream。
- ✅ 持久化:支持 RDB 和 AOF,保证数据不丢失。
- ✅ 高可用:支持主从复制、哨兵(Sentinel)、集群(Cluster)。
- ✅ 原子性操作:所有命令都是原子的,支持事务和 Lua 脚本。
- ✅ 单线程模型:避免了多线程的上下文切换和锁竞争(网络 I/O 多线程从 Redis 6.0 开始引入)。
2. Redis 和 Memcached 的区别?
| 特性 | Redis | Memcached |
|---|---|---|
| 数据结构 | 丰富(String, Hash, List, Set, ZSet 等) | 仅 String |
| 持久化 | 支持 RDB/AOF | 不支持 |
| 高可用 | 支持主从、哨兵、集群 | 需外部实现 |
| 内存管理 | 自动回收过期键 | LRU 淘汰 |
| 并发模型 | 单线程(命令执行) + 多 I/O 线程(6.0+) | 多线程 |
| 分布式 | 原生支持 Cluster | 客户端分片 |
| 应用场景 | 缓存、队列、计数器、排行榜 | 纯缓存 |
- 总结:Redis 功能更全面,Memcached 更轻量、专注缓存。
二、数据结构与使用场景
3. Redis 有哪些数据类型?各自的应用场景?
| 数据类型 | 应用场景 |
|---|---|
| String | 缓存对象(JSON)、计数器(INCR)、分布式锁(SETNX)、限流(INCR + EXPIRE) |
| Hash | 存储对象(如用户信息 user:1001 {name: "Alice", age: 30}),比多个 String 更节省内存 |
| List | 消息队列(LPUSH + RPOP)、最新消息列表(如朋友圈动态) |
| Set | 去重(如标签、IP 黑名单)、共同好友、抽奖(SPOP) |
| Sorted Set (ZSet) | 排行榜(ZADD score member)、带权重的任务队列、延迟队列(时间戳为 score) |
| Bitmap | 用户签到(每天一位)、活跃用户统计(BITCOUNT) |
| HyperLogLog | 海量数据去重统计(如 UV 统计),误差率约 0.81% |
| Geo | 地理位置查询(附近的人、打车距离计算) |
| Stream | 消息队列(类似 Kafka),支持消费者组、消息回溯 |
4. 如何用 Redis 实现分布式锁?注意事项?
# 获取锁
SET lock_key unique_value NX EX 10
# NX: 键不存在时才设置
# EX: 设置 10 秒过期时间,防止死锁
# 释放锁(需保证原子性)
EVAL "
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
" 1 lock_key unique_value- 注意事项: ✅ 锁过期时间:合理设置,避免业务未执行完锁就释放。 ✅ 原子性释放:必须检查 value 是否为自己持有,防止误删。 ✅ Redlock 算法:用于多节点环境,提高可靠性(但有争议)。 ✅ 推荐方案:使用 Redisson 客户端,支持可重入锁、看门狗自动续期。
三、持久化机制
5. RDB 和 AOF 的区别?如何选择?
| 特性 | RDB(快照) | AOF(追加日志) |
|---|---|---|
| 原理 | 定时保存内存快照(SAVE/BGSAVE) | 记录每个写命令 |
| 文件大小 | 小,压缩二进制格式 | 大,文本格式 |
| 恢复速度 | 快 | 慢(需重放命令) |
| 数据安全性 | 可能丢失最后一次快照后的数据 | 更高(appendfsync everysec 最多丢 1 秒) |
| 启动速度 | 快 | 慢 |
| 配置 | save 900 1 等 | appendonly yes |
- 选择建议: ✅ 仅 RDB:允许少量数据丢失,追求高性能和快速恢复。 ✅ 仅 AOF:对数据安全性要求高。 ✅ 两者共存:优先加载 AOF(更完整),RDB 作为备份。
四、高可用与集群
6. Redis 主从复制的原理?
- 作用:数据冗余、读写分离、高可用基础。
- 流程: 从节点发送 PSYNC 命令给主节点。 全量同步(首次或断线太久): 主节点执行 BGSAVE 生成 RDB。 将 RDB 发送给从节点。 从节点加载 RDB。 主节点将缓冲区中的写命令发送给从节点。 增量同步(正常情况): 主节点将写命令实时发送给从节点(异步复制)。
- 缺点:主节点故障后,需手动切换从节点为主,无法自动故障转移。
7. Redis Sentinel(哨兵)的作用?
- 目标:实现 高可用,自动故障转移。
- 功能: ✅ 监控:持续检查主从节点是否正常。 ✅ 通知:异常时通过 API 或脚本通知管理员。 ✅ 自动故障转移: 主节点宕机,哨兵选举一个从节点提升为新主。 其他从节点指向新主。 更新客户端连接信息。 ✅ 配置提供者:客户端通过哨兵获取当前主节点地址。
- 部署:建议至少 3 个哨兵节点,避免脑裂。
8. Redis Cluster 的工作原理?
- 目标:解决单机 Redis 内存和 QPS 瓶颈,实现分布式。
- 核心机制: ✅ 分片(Sharding):数据被分散到 16384 个哈希槽(hash slots)。 ✅ 节点分配:每个主节点负责一部分槽(如 node1: 0-5500, node2: 5501-11000)。 ✅ 客户端路由:客户端直接连接任一节点,节点返回 MOVED 或 ASK 重定向。 ✅ 高可用:每个主节点有 1 个或多个从节点,故障时自动选举。 ✅ Gossip 协议:节点间通过 Gossip 交换集群状态。
- 优点:无中心节点,扩展性强。
- 限制: ❌ 不支持多数据库(SELECT db)。 ❌ KEYS * 等操作受限。 ❌ 事务和 Lua 脚本只能操作同一个节点上的 key。
五、性能与优化
9. Redis 是单线程的吗?为什么这么快?
命令执行是单线程的(Redis 6.0 前),避免了多线程的锁和上下文切换开销。 从 6.0 开始:网络 I/O 使用多线程(io-threads),但命令处理仍是单线程。
- 为什么快: ✅ 内存操作:数据在内存中,访问速度快。 ✅ 高效数据结构:如跳跃表(Skip List)实现 ZSet。 ✅ I/O 多路复用:使用 epoll(Linux)、kqueue(macOS)处理大量并发连接。 ✅ 单线程模型:无锁竞争,简单高效。
10. 如何优化 Redis 性能?
✅ 合理设计 key:避免过长 key,使用前缀区分业务(如 order:1001)。
✅ 批量操作:使用 MGET, MSET, Pipeline 减少网络往返。
✅ 避免大 key:大 string、big hash/list/set 会导致阻塞主线程。拆分或使用 SCAN。
✅ 设置合理的过期时间:防止内存泄漏。
✅ 禁用危险命令:如 KEYS *,改用 SCAN。
✅ 使用连接池:减少连接创建开销。
✅ 监控慢查询:slowlog get 查看执行时间 > slowlog-log-slower-than 的命令。
六、缓存设计
11. 缓存穿透、击穿、雪崩的区别?如何解决?
| 问题 | 定义 | 解决方案 |
|---|---|---|
| 缓存穿透 | 查询不存在的数据,缓存和 DB 都查不到,频繁请求打到 DB。 | ✅ 布隆过滤器(Bloom Filter):先判断 key 是否可能存在。✅ 缓存空值:对不存在的 key 也缓存 null(设短过期时间)。 |
| 缓存击穿 | 热点 key 过期瞬间,大量请求同时打到 DB。 | ✅ 永不过期(逻辑过期)✅ 互斥锁:只让一个线程重建缓存。 |
| 缓存雪崩 | 大量 key 同时过期,或 Redis 宕机,导致请求全部打到 DB。 | ✅ 过期时间加随机值(如 1h ± 10min)✅ 高可用架构:主从、哨兵、集群 ✅ 服务降级 & 熔断(如 Hystrix) |
12. 如何保证 Redis 和数据库的数据一致性?
- 没有强一致性方案,只能最终一致。
- 常用策略:
- 先更新 DB,再删除缓存(推荐): 优点:简单,缓存命中率高。 风险:删除失败可能导致不一致。 改进:双删 + 延迟:更新 DB → 删缓存 → 延迟 1s → 再删一次(防旧数据写入)。
- 先删除缓存,再更新 DB(延迟双删): 适用于缓存更新成本高的场景。
- 订阅 Binlog(如阿里 Canal): 监听数据库变更,自动更新/删除缓存。 实现最终一致性。
七、其他高级问题
13. Redis 内存回收策略有哪些?
通过 maxmemory-policy 配置:
| 策略 | 说明 |
|---|---|
| noeviction | 内存满时报错(默认) |
| allkeys-lru | 对所有 key 使用 LRU 算法淘汰 |
| volatile-lru | 仅对设置了过期时间的 key 使用 LRU |
| allkeys-random | 随机淘汰任意 key |
| volatile-random | 随机淘汰设置了过期时间的 key |
| volatile-ttl | 优先淘汰剩余时间短的 key |
- ✅ 推荐:allkeys-lru(通用缓存场景)。
14. 什么是缓存预热?为什么要预热?
- 定义:在系统上线或高峰期前,提前将热点数据加载到 Redis。
- 目的: 避免冷启动时大量请求打到数据库。 提升用户体验。
- 方法: 启动脚本批量 MSET。 通过日志分析热点数据。 使用定时任务定期预热。
15. Redis 如何实现延时队列?
- 使用 Sorted Set(ZSet): score = 执行时间戳(毫秒)。 member = 任务 ID 或内容。
- 生产者:
ZADD delay_queue <timestamp> "task:1"- 消费者(轮询):
# 获取当前时间前的所有任务
ZRANGEBYSCORE delay_queue 0 <current_timestamp>
# 处理任务后删除
ZREM delay_queue "task:1"- ✅ 改进:结合 Blocking 操作或外部调度器提高效率。
八、实际应用场景
1. .NET Core 集成示例
安装依赖:
bash
dotnet add package StackExchange.Redis连接 Redis:
csharp
using StackExchange.Redis;
var connection = ConnectionMultiplexer.Connect("localhost:6379");
var db = connection.GetDatabase();基本操作:
csharp
// String
await db.StringSetAsync("key", "value");
var value = await db.StringGetAsync("key");
// Hash
await db.HashSetAsync("user:1001", new HashEntry[]
{
new("name", "Alice"),
new("age", "30")
});
var name = await db.HashGetAsync("user:1001", "name");
// List
await db.ListLeftPushAsync("queue", "item1");
var item = await db.ListRightPopAsync("queue");
// Set
await db.SetAddAsync("tags", "redis");
await db.SetAddAsync("tags", "csharp");
var tags = await db.SetMembersAsync("tags");
// Sorted Set
await db.SortedSetAddAsync("leaderboard", "user1", 100);
var topUsers = await db.SortedSetRangeByRankAsync("leaderboard", 0, 9, Order.Descending);2. 分布式锁实现(.NET Core)
csharp
public class RedisDistributedLock
{
private readonly IDatabase _db;
public RedisDistributedLock(IDatabase db)
{
_db = db;
}
public async Task<bool> TryLockAsync(string key, string value, TimeSpan expiry)
{
return await _db.StringSetAsync(
key,
value,
expiry,
When.NotExists,
CommandFlags.None);
}
public async Task<bool> ReleaseLockAsync(string key, string value)
{
const string script = @"
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end";
var result = await _db.ScriptEvaluateAsync(
script,
new RedisKey[] { key },
new RedisValue[] { value });
return (int)result == 1;
}
}
// 使用示例
var lockKey = "lock:resource:1";
var lockValue = Guid.NewGuid().ToString();
var expiry = TimeSpan.FromSeconds(10);
if (await distributedLock.TryLockAsync(lockKey, lockValue, expiry))
{
try
{
// 执行需要加锁的操作
await DoWorkAsync();
}
finally
{
await distributedLock.ReleaseLockAsync(lockKey, lockValue);
}
}3. 缓存实现(.NET Core)
csharp
public class RedisCacheService
{
private readonly IDatabase _db;
private readonly ConnectionMultiplexer _redis;
public RedisCacheService(ConnectionMultiplexer redis)
{
_redis = redis;
_db = redis.GetDatabase();
}
public async Task<T> GetOrSetAsync<T>(
string key,
Func<Task<T>> factory,
TimeSpan? expiry = null)
{
var value = await _db.StringGetAsync(key);
if (value.HasValue)
{
return JsonSerializer.Deserialize<T>(value);
}
var result = await factory();
var serialized = JsonSerializer.Serialize(result);
if (expiry.HasValue)
{
await _db.StringSetAsync(key, serialized, expiry.Value);
}
else
{
await _db.StringSetAsync(key, serialized);
}
return result;
}
public async Task SetAsync<T>(string key, T value, TimeSpan? expiry = null)
{
var serialized = JsonSerializer.Serialize(value);
if (expiry.HasValue)
{
await _db.StringSetAsync(key, serialized, expiry.Value);
}
else
{
await _db.StringSetAsync(key, serialized);
}
}
public async Task<T> GetAsync<T>(string key)
{
var value = await _db.StringGetAsync(key);
return value.HasValue ? JsonSerializer.Deserialize<T>(value) : default(T);
}
public async Task RemoveAsync(string key)
{
await _db.KeyDeleteAsync(key);
}
}4. 布隆过滤器实现(防缓存穿透)
csharp
// 使用 StackExchange.Redis 的布隆过滤器(需要 RedisBloom 模块)
public class BloomFilterService
{
private readonly IDatabase _db;
public BloomFilterService(IDatabase db)
{
_db = db;
}
// 添加元素
public async Task<bool> AddAsync(string filterName, string value)
{
return await _db.ExecuteAsync("BF.ADD", filterName, value).ContinueWith(
task => (bool)task.Result);
}
// 检查元素是否存在
public async Task<bool> ExistsAsync(string filterName, string value)
{
return await _db.ExecuteAsync("BF.EXISTS", filterName, value).ContinueWith(
task => (bool)task.Result);
}
// 创建布隆过滤器
public async Task<bool> CreateAsync(string filterName, long capacity, double errorRate)
{
return await _db.ExecuteAsync(
"BF.RESERVE",
filterName,
errorRate.ToString("F2"),
capacity).ContinueWith(
task => (string)task.Result == "OK");
}
}
// 使用示例:防止缓存穿透
public async Task<User> GetUserAsync(int userId)
{
var cacheKey = $"user:{userId}";
var filterName = "user_ids";
// 先检查布隆过滤器
if (!await _bloomFilter.ExistsAsync(filterName, userId.ToString()))
{
return null; // 肯定不存在,直接返回
}
// 从缓存获取
var user = await _cache.GetAsync<User>(cacheKey);
if (user != null)
{
return user;
}
// 从数据库获取
user = await _userRepository.GetByIdAsync(userId);
if (user != null)
{
// 添加到布隆过滤器和缓存
await _bloomFilter.AddAsync(filterName, userId.ToString());
await _cache.SetAsync(cacheKey, user, TimeSpan.FromMinutes(30));
}
else
{
// 缓存空值,防止穿透
await _cache.SetAsync(cacheKey, (User)null, TimeSpan.FromMinutes(5));
}
return user;
}5. 延迟队列实现(.NET Core)
csharp
public class RedisDelayQueue
{
private readonly IDatabase _db;
private readonly string _queueKey;
public RedisDelayQueue(IDatabase db, string queueKey)
{
_db = db;
_queueKey = queueKey;
}
// 添加延迟任务
public async Task AddTaskAsync(string taskId, string taskData, DateTime executeTime)
{
var score = ((DateTimeOffset)executeTime).ToUnixTimeMilliseconds();
await _db.SortedSetAddAsync(_queueKey, taskId, score);
await _db.StringSetAsync($"task:{taskId}", taskData);
}
// 获取到期的任务
public async Task<List<string>> GetReadyTasksAsync()
{
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
var taskIds = await _db.SortedSetRangeByScoreAsync(_queueKey, 0, now);
return taskIds.Select(id => id.ToString()).ToList();
}
// 移除已处理的任务
public async Task RemoveTaskAsync(string taskId)
{
await _db.SortedSetRemoveAsync(_queueKey, taskId);
await _db.KeyDeleteAsync($"task:{taskId}");
}
// 消费者轮询
public async Task StartConsumerAsync(Func<string, string, Task> handler)
{
while (true)
{
var taskIds = await GetReadyTasksAsync();
foreach (var taskId in taskIds)
{
var taskData = await _db.StringGetAsync($"task:{taskId}");
if (taskData.HasValue)
{
await handler(taskId, taskData);
await RemoveTaskAsync(taskId);
}
}
await Task.Delay(1000); // 每秒检查一次
}
}
}6. 限流实现(滑动窗口)
csharp
public class RedisRateLimiter
{
private readonly IDatabase _db;
public RedisRateLimiter(IDatabase db)
{
_db = db;
}
// 滑动窗口限流
public async Task<bool> TryAcquireAsync(string key, int limit, TimeSpan window)
{
var now = DateTimeOffset.UtcNow;
var windowStart = now - window;
var windowStartMs = windowStart.ToUnixTimeMilliseconds();
// 使用 Sorted Set 存储请求时间戳
var pipeline = _db.CreateBatch();
// 移除窗口外的记录
pipeline.SortedSetRemoveRangeByScoreAsync(key, 0, windowStartMs);
// 添加当前请求
pipeline.SortedSetAddAsync(key, now.ToUnixTimeMilliseconds().ToString(), now.ToUnixTimeMilliseconds());
// 获取窗口内的请求数
var countTask = pipeline.SortedSetLengthAsync(key);
pipeline.Execute();
var count = await countTask;
// 设置过期时间
if (count == 1)
{
await _db.KeyExpireAsync(key, window);
}
return count <= limit;
}
// 固定窗口限流
public async Task<bool> TryAcquireFixedAsync(string key, int limit, TimeSpan window)
{
var now = DateTimeOffset.UtcNow;
var windowKey = $"{key}:{now.ToUnixTimeSeconds() / (int)window.TotalSeconds}";
var count = await _db.StringIncrementAsync(windowKey);
if (count == 1)
{
await _db.KeyExpireAsync(windowKey, window);
}
return count <= limit;
}
}
// 使用示例
var limiter = new RedisRateLimiter(db);
// 每 1 分钟最多 100 次请求
if (await limiter.TryAcquireAsync("rate_limit:user:1001", 100, TimeSpan.FromMinutes(1)))
{
// 允许请求
}
else
{
// 限流,拒绝请求
return new StatusCodeResult(429); // Too Many Requests
}九、最佳实践补充
- ✅ 使用连接池 - 复用连接,减少开销
- ✅ 批量操作 - 使用 Pipeline 或 MGET/MSET
- ✅ 设置合理的过期时间 - 防止内存泄漏
- ✅ 监控内存使用 - 及时发现问题
- ✅ 使用 Lua 脚本 - 保证原子性操作
- ✅ 实现重试机制 - 网络故障时自动重试
- ✅ 使用 Sentinel 或 Cluster - 实现高可用
- ❌ 避免大 key - 拆分或使用 SCAN
- ❌ 不要在生产环境使用 KEYS - 使用 SCAN 代替
- ❌ 避免频繁的过期键操作 - 影响性能