Skip to content

一、基础概念

1. Redis 是什么?有什么特点?

  • 定义:Redis(Remote Dictionary Server)是一个开源的、基于内存的 键值存储系统,支持多种数据结构,常用于缓存、消息队列、会话存储等场景。
  • 核心特点:
    • ✅ 高性能:数据在内存中操作,读写速度极快(10万+/秒)。
    • ✅ 丰富数据结构:String, Hash, List, Set, Sorted Set, Bitmap, HyperLogLog, Geo, Stream。
    • ✅ 持久化:支持 RDB 和 AOF,保证数据不丢失。
    • ✅ 高可用:支持主从复制、哨兵(Sentinel)、集群(Cluster)。
    • ✅ 原子性操作:所有命令都是原子的,支持事务和 Lua 脚本。
    • ✅ 单线程模型:避免了多线程的上下文切换和锁竞争(网络 I/O 多线程从 Redis 6.0 开始引入)。

2. Redis 和 Memcached 的区别?

特性RedisMemcached
数据结构丰富(String, Hash, List, Set, ZSet 等)仅 String
持久化支持 RDB/AOF不支持
高可用支持主从、哨兵、集群需外部实现
内存管理自动回收过期键LRU 淘汰
并发模型单线程(命令执行) + 多 I/O 线程(6.0+)多线程
分布式原生支持 Cluster客户端分片
应用场景缓存、队列、计数器、排行榜纯缓存
  • 总结:Redis 功能更全面,Memcached 更轻量、专注缓存。

二、数据结构与使用场景

3. Redis 有哪些数据类型?各自的应用场景?

数据类型应用场景
String缓存对象(JSON)、计数器(INCR)、分布式锁(SETNX)、限流(INCR + EXPIRE)
Hash存储对象(如用户信息 user:1001 {name: "Alice", age: 30}),比多个 String 更节省内存
List消息队列(LPUSH + RPOP)、最新消息列表(如朋友圈动态)
Set去重(如标签、IP 黑名单)、共同好友、抽奖(SPOP)
Sorted Set (ZSet)排行榜(ZADD score member)、带权重的任务队列、延迟队列(时间戳为 score)
Bitmap用户签到(每天一位)、活跃用户统计(BITCOUNT)
HyperLogLog海量数据去重统计(如 UV 统计),误差率约 0.81%
Geo地理位置查询(附近的人、打车距离计算)
Stream消息队列(类似 Kafka),支持消费者组、消息回溯

4. 如何用 Redis 实现分布式锁?注意事项?

# 获取锁
SET lock_key unique_value NX EX 10
# NX: 键不存在时才设置
# EX: 设置 10 秒过期时间,防止死锁

# 释放锁(需保证原子性)
EVAL "
if redis.call('get', KEYS[1]) == ARGV[1] then
    return redis.call('del', KEYS[1])
else
    return 0
end
" 1 lock_key unique_value
  • 注意事项: ✅ 锁过期时间:合理设置,避免业务未执行完锁就释放。 ✅ 原子性释放:必须检查 value 是否为自己持有,防止误删。 ✅ Redlock 算法:用于多节点环境,提高可靠性(但有争议)。 ✅ 推荐方案:使用 Redisson 客户端,支持可重入锁、看门狗自动续期。

三、持久化机制

5. RDB 和 AOF 的区别?如何选择?

特性RDB(快照)AOF(追加日志)
原理定时保存内存快照(SAVE/BGSAVE)记录每个写命令
文件大小小,压缩二进制格式大,文本格式
恢复速度慢(需重放命令)
数据安全性可能丢失最后一次快照后的数据更高(appendfsync everysec 最多丢 1 秒)
启动速度
配置save 900 1 等appendonly yes
  • 选择建议: ✅ 仅 RDB:允许少量数据丢失,追求高性能和快速恢复。 ✅ 仅 AOF:对数据安全性要求高。 ✅ 两者共存:优先加载 AOF(更完整),RDB 作为备份。

四、高可用与集群

6. Redis 主从复制的原理?

  • 作用:数据冗余、读写分离、高可用基础。
  • 流程: 从节点发送 PSYNC 命令给主节点。 全量同步(首次或断线太久): 主节点执行 BGSAVE 生成 RDB。 将 RDB 发送给从节点。 从节点加载 RDB。 主节点将缓冲区中的写命令发送给从节点。 增量同步(正常情况): 主节点将写命令实时发送给从节点(异步复制)。
  • 缺点:主节点故障后,需手动切换从节点为主,无法自动故障转移。

7. Redis Sentinel(哨兵)的作用?

  • 目标:实现 高可用,自动故障转移。
  • 功能: ✅ 监控:持续检查主从节点是否正常。 ✅ 通知:异常时通过 API 或脚本通知管理员。 ✅ 自动故障转移: 主节点宕机,哨兵选举一个从节点提升为新主。 其他从节点指向新主。 更新客户端连接信息。 ✅ 配置提供者:客户端通过哨兵获取当前主节点地址。
  • 部署:建议至少 3 个哨兵节点,避免脑裂。

8. Redis Cluster 的工作原理?

  • 目标:解决单机 Redis 内存和 QPS 瓶颈,实现分布式。
  • 核心机制: ✅ 分片(Sharding):数据被分散到 16384 个哈希槽(hash slots)。 ✅ 节点分配:每个主节点负责一部分槽(如 node1: 0-5500, node2: 5501-11000)。 ✅ 客户端路由:客户端直接连接任一节点,节点返回 MOVED 或 ASK 重定向。 ✅ 高可用:每个主节点有 1 个或多个从节点,故障时自动选举。 ✅ Gossip 协议:节点间通过 Gossip 交换集群状态。
  • 优点:无中心节点,扩展性强。
  • 限制: ❌ 不支持多数据库(SELECT db)。 ❌ KEYS * 等操作受限。 ❌ 事务和 Lua 脚本只能操作同一个节点上的 key。

五、性能与优化

9. Redis 是单线程的吗?为什么这么快?

命令执行是单线程的(Redis 6.0 前),避免了多线程的锁和上下文切换开销。 从 6.0 开始:网络 I/O 使用多线程(io-threads),但命令处理仍是单线程。

  • 为什么快: ✅ 内存操作:数据在内存中,访问速度快。 ✅ 高效数据结构:如跳跃表(Skip List)实现 ZSet。 ✅ I/O 多路复用:使用 epoll(Linux)、kqueue(macOS)处理大量并发连接。 ✅ 单线程模型:无锁竞争,简单高效。

10. 如何优化 Redis 性能?

✅ 合理设计 key:避免过长 key,使用前缀区分业务(如 order:1001)。
✅ 批量操作:使用 MGET, MSET, Pipeline 减少网络往返。
✅ 避免大 key:大 string、big hash/list/set 会导致阻塞主线程。拆分或使用 SCAN。
✅ 设置合理的过期时间:防止内存泄漏。
✅ 禁用危险命令:如 KEYS *,改用 SCAN。
✅ 使用连接池:减少连接创建开销。
✅ 监控慢查询:slowlog get 查看执行时间 > slowlog-log-slower-than 的命令。

六、缓存设计

11. 缓存穿透、击穿、雪崩的区别?如何解决?

问题定义解决方案
缓存穿透查询不存在的数据,缓存和 DB 都查不到,频繁请求打到 DB。✅ 布隆过滤器(Bloom Filter):先判断 key 是否可能存在。✅ 缓存空值:对不存在的 key 也缓存 null(设短过期时间)。
缓存击穿热点 key 过期瞬间,大量请求同时打到 DB。✅ 永不过期(逻辑过期)✅ 互斥锁:只让一个线程重建缓存。
缓存雪崩大量 key 同时过期,或 Redis 宕机,导致请求全部打到 DB。✅ 过期时间加随机值(如 1h ± 10min)✅ 高可用架构:主从、哨兵、集群 ✅ 服务降级 & 熔断(如 Hystrix)

12. 如何保证 Redis 和数据库的数据一致性?

  • 没有强一致性方案,只能最终一致。
  • 常用策略:
    • 先更新 DB,再删除缓存(推荐): 优点:简单,缓存命中率高。 风险:删除失败可能导致不一致。 改进:双删 + 延迟:更新 DB → 删缓存 → 延迟 1s → 再删一次(防旧数据写入)。
    • 先删除缓存,再更新 DB(延迟双删): 适用于缓存更新成本高的场景。
    • 订阅 Binlog(如阿里 Canal): 监听数据库变更,自动更新/删除缓存。 实现最终一致性。

七、其他高级问题

13. Redis 内存回收策略有哪些?

通过 maxmemory-policy 配置:

策略说明
noeviction内存满时报错(默认)
allkeys-lru对所有 key 使用 LRU 算法淘汰
volatile-lru仅对设置了过期时间的 key 使用 LRU
allkeys-random随机淘汰任意 key
volatile-random随机淘汰设置了过期时间的 key
volatile-ttl优先淘汰剩余时间短的 key
  • ✅ 推荐:allkeys-lru(通用缓存场景)。

14. 什么是缓存预热?为什么要预热?

  • 定义:在系统上线或高峰期前,提前将热点数据加载到 Redis。
  • 目的: 避免冷启动时大量请求打到数据库。 提升用户体验。
  • 方法: 启动脚本批量 MSET。 通过日志分析热点数据。 使用定时任务定期预热。

15. Redis 如何实现延时队列?

  • 使用 Sorted Set(ZSet): score = 执行时间戳(毫秒)。 member = 任务 ID 或内容。
  • 生产者:
ZADD delay_queue <timestamp> "task:1"
  • 消费者(轮询):
# 获取当前时间前的所有任务
ZRANGEBYSCORE delay_queue 0 <current_timestamp>
# 处理任务后删除
ZREM delay_queue "task:1"
  • ✅ 改进:结合 Blocking 操作或外部调度器提高效率。

八、实际应用场景

1. .NET Core 集成示例

安装依赖:

bash
dotnet add package StackExchange.Redis

连接 Redis:

csharp
using StackExchange.Redis;

var connection = ConnectionMultiplexer.Connect("localhost:6379");
var db = connection.GetDatabase();

基本操作:

csharp
// String
await db.StringSetAsync("key", "value");
var value = await db.StringGetAsync("key");

// Hash
await db.HashSetAsync("user:1001", new HashEntry[]
{
    new("name", "Alice"),
    new("age", "30")
});
var name = await db.HashGetAsync("user:1001", "name");

// List
await db.ListLeftPushAsync("queue", "item1");
var item = await db.ListRightPopAsync("queue");

// Set
await db.SetAddAsync("tags", "redis");
await db.SetAddAsync("tags", "csharp");
var tags = await db.SetMembersAsync("tags");

// Sorted Set
await db.SortedSetAddAsync("leaderboard", "user1", 100);
var topUsers = await db.SortedSetRangeByRankAsync("leaderboard", 0, 9, Order.Descending);

2. 分布式锁实现(.NET Core)

csharp
public class RedisDistributedLock
{
    private readonly IDatabase _db;
    
    public RedisDistributedLock(IDatabase db)
    {
        _db = db;
    }
    
    public async Task<bool> TryLockAsync(string key, string value, TimeSpan expiry)
    {
        return await _db.StringSetAsync(
            key, 
            value, 
            expiry, 
            When.NotExists, 
            CommandFlags.None);
    }
    
    public async Task<bool> ReleaseLockAsync(string key, string value)
    {
        const string script = @"
            if redis.call('get', KEYS[1]) == ARGV[1] then
                return redis.call('del', KEYS[1])
            else
                return 0
            end";
        
        var result = await _db.ScriptEvaluateAsync(
            script, 
            new RedisKey[] { key }, 
            new RedisValue[] { value });
        
        return (int)result == 1;
    }
}

// 使用示例
var lockKey = "lock:resource:1";
var lockValue = Guid.NewGuid().ToString();
var expiry = TimeSpan.FromSeconds(10);

if (await distributedLock.TryLockAsync(lockKey, lockValue, expiry))
{
    try
    {
        // 执行需要加锁的操作
        await DoWorkAsync();
    }
    finally
    {
        await distributedLock.ReleaseLockAsync(lockKey, lockValue);
    }
}

3. 缓存实现(.NET Core)

csharp
public class RedisCacheService
{
    private readonly IDatabase _db;
    private readonly ConnectionMultiplexer _redis;
    
    public RedisCacheService(ConnectionMultiplexer redis)
    {
        _redis = redis;
        _db = redis.GetDatabase();
    }
    
    public async Task<T> GetOrSetAsync<T>(
        string key, 
        Func<Task<T>> factory, 
        TimeSpan? expiry = null)
    {
        var value = await _db.StringGetAsync(key);
        
        if (value.HasValue)
        {
            return JsonSerializer.Deserialize<T>(value);
        }
        
        var result = await factory();
        var serialized = JsonSerializer.Serialize(result);
        
        if (expiry.HasValue)
        {
            await _db.StringSetAsync(key, serialized, expiry.Value);
        }
        else
        {
            await _db.StringSetAsync(key, serialized);
        }
        
        return result;
    }
    
    public async Task SetAsync<T>(string key, T value, TimeSpan? expiry = null)
    {
        var serialized = JsonSerializer.Serialize(value);
        
        if (expiry.HasValue)
        {
            await _db.StringSetAsync(key, serialized, expiry.Value);
        }
        else
        {
            await _db.StringSetAsync(key, serialized);
        }
    }
    
    public async Task<T> GetAsync<T>(string key)
    {
        var value = await _db.StringGetAsync(key);
        return value.HasValue ? JsonSerializer.Deserialize<T>(value) : default(T);
    }
    
    public async Task RemoveAsync(string key)
    {
        await _db.KeyDeleteAsync(key);
    }
}

4. 布隆过滤器实现(防缓存穿透)

csharp
// 使用 StackExchange.Redis 的布隆过滤器(需要 RedisBloom 模块)
public class BloomFilterService
{
    private readonly IDatabase _db;
    
    public BloomFilterService(IDatabase db)
    {
        _db = db;
    }
    
    // 添加元素
    public async Task<bool> AddAsync(string filterName, string value)
    {
        return await _db.ExecuteAsync("BF.ADD", filterName, value).ContinueWith(
            task => (bool)task.Result);
    }
    
    // 检查元素是否存在
    public async Task<bool> ExistsAsync(string filterName, string value)
    {
        return await _db.ExecuteAsync("BF.EXISTS", filterName, value).ContinueWith(
            task => (bool)task.Result);
    }
    
    // 创建布隆过滤器
    public async Task<bool> CreateAsync(string filterName, long capacity, double errorRate)
    {
        return await _db.ExecuteAsync(
            "BF.RESERVE", 
            filterName, 
            errorRate.ToString("F2"), 
            capacity).ContinueWith(
                task => (string)task.Result == "OK");
    }
}

// 使用示例:防止缓存穿透
public async Task<User> GetUserAsync(int userId)
{
    var cacheKey = $"user:{userId}";
    var filterName = "user_ids";
    
    // 先检查布隆过滤器
    if (!await _bloomFilter.ExistsAsync(filterName, userId.ToString()))
    {
        return null; // 肯定不存在,直接返回
    }
    
    // 从缓存获取
    var user = await _cache.GetAsync<User>(cacheKey);
    if (user != null)
    {
        return user;
    }
    
    // 从数据库获取
    user = await _userRepository.GetByIdAsync(userId);
    if (user != null)
    {
        // 添加到布隆过滤器和缓存
        await _bloomFilter.AddAsync(filterName, userId.ToString());
        await _cache.SetAsync(cacheKey, user, TimeSpan.FromMinutes(30));
    }
    else
    {
        // 缓存空值,防止穿透
        await _cache.SetAsync(cacheKey, (User)null, TimeSpan.FromMinutes(5));
    }
    
    return user;
}

5. 延迟队列实现(.NET Core)

csharp
public class RedisDelayQueue
{
    private readonly IDatabase _db;
    private readonly string _queueKey;
    
    public RedisDelayQueue(IDatabase db, string queueKey)
    {
        _db = db;
        _queueKey = queueKey;
    }
    
    // 添加延迟任务
    public async Task AddTaskAsync(string taskId, string taskData, DateTime executeTime)
    {
        var score = ((DateTimeOffset)executeTime).ToUnixTimeMilliseconds();
        await _db.SortedSetAddAsync(_queueKey, taskId, score);
        await _db.StringSetAsync($"task:{taskId}", taskData);
    }
    
    // 获取到期的任务
    public async Task<List<string>> GetReadyTasksAsync()
    {
        var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
        var taskIds = await _db.SortedSetRangeByScoreAsync(_queueKey, 0, now);
        return taskIds.Select(id => id.ToString()).ToList();
    }
    
    // 移除已处理的任务
    public async Task RemoveTaskAsync(string taskId)
    {
        await _db.SortedSetRemoveAsync(_queueKey, taskId);
        await _db.KeyDeleteAsync($"task:{taskId}");
    }
    
    // 消费者轮询
    public async Task StartConsumerAsync(Func<string, string, Task> handler)
    {
        while (true)
        {
            var taskIds = await GetReadyTasksAsync();
            
            foreach (var taskId in taskIds)
            {
                var taskData = await _db.StringGetAsync($"task:{taskId}");
                if (taskData.HasValue)
                {
                    await handler(taskId, taskData);
                    await RemoveTaskAsync(taskId);
                }
            }
            
            await Task.Delay(1000); // 每秒检查一次
        }
    }
}

6. 限流实现(滑动窗口)

csharp
public class RedisRateLimiter
{
    private readonly IDatabase _db;
    
    public RedisRateLimiter(IDatabase db)
    {
        _db = db;
    }
    
    // 滑动窗口限流
    public async Task<bool> TryAcquireAsync(string key, int limit, TimeSpan window)
    {
        var now = DateTimeOffset.UtcNow;
        var windowStart = now - window;
        var windowStartMs = windowStart.ToUnixTimeMilliseconds();
        
        // 使用 Sorted Set 存储请求时间戳
        var pipeline = _db.CreateBatch();
        
        // 移除窗口外的记录
        pipeline.SortedSetRemoveRangeByScoreAsync(key, 0, windowStartMs);
        
        // 添加当前请求
        pipeline.SortedSetAddAsync(key, now.ToUnixTimeMilliseconds().ToString(), now.ToUnixTimeMilliseconds());
        
        // 获取窗口内的请求数
        var countTask = pipeline.SortedSetLengthAsync(key);
        
        pipeline.Execute();
        
        var count = await countTask;
        
        // 设置过期时间
        if (count == 1)
        {
            await _db.KeyExpireAsync(key, window);
        }
        
        return count <= limit;
    }
    
    // 固定窗口限流
    public async Task<bool> TryAcquireFixedAsync(string key, int limit, TimeSpan window)
    {
        var now = DateTimeOffset.UtcNow;
        var windowKey = $"{key}:{now.ToUnixTimeSeconds() / (int)window.TotalSeconds}";
        
        var count = await _db.StringIncrementAsync(windowKey);
        
        if (count == 1)
        {
            await _db.KeyExpireAsync(windowKey, window);
        }
        
        return count <= limit;
    }
}

// 使用示例
var limiter = new RedisRateLimiter(db);

// 每 1 分钟最多 100 次请求
if (await limiter.TryAcquireAsync("rate_limit:user:1001", 100, TimeSpan.FromMinutes(1)))
{
    // 允许请求
}
else
{
    // 限流,拒绝请求
    return new StatusCodeResult(429); // Too Many Requests
}

九、最佳实践补充

  1. 使用连接池 - 复用连接,减少开销
  2. 批量操作 - 使用 Pipeline 或 MGET/MSET
  3. 设置合理的过期时间 - 防止内存泄漏
  4. 监控内存使用 - 及时发现问题
  5. 使用 Lua 脚本 - 保证原子性操作
  6. 实现重试机制 - 网络故障时自动重试
  7. 使用 Sentinel 或 Cluster - 实现高可用
  8. 避免大 key - 拆分或使用 SCAN
  9. 不要在生产环境使用 KEYS - 使用 SCAN 代替
  10. 避免频繁的过期键操作 - 影响性能

基于 VitePress 构建 | Copyright © 2026-present