熔断与限流
一、熔断器(Circuit Breaker)
1. 什么是熔断器?
熔断器是一种设计模式,用于防止级联故障。当服务调用失败率达到阈值时,熔断器会"打开",停止调用该服务,直接返回错误,避免资源浪费。
熔断器状态:
- 关闭(Closed) - 正常状态,请求正常通过
- 打开(Open) - 故障状态,请求直接拒绝
- 半开(Half-Open) - 恢复状态,允许少量请求通过,测试服务是否恢复
2. 为什么需要熔断器?
问题场景:
- 服务 A 调用服务 B
- 服务 B 响应慢或不可用
- 服务 A 的线程被阻塞,等待服务 B 响应
- 服务 A 的线程池耗尽,无法处理其他请求
- 级联故障,整个系统崩溃
熔断器的作用:
- ✅ 快速失败 - 立即返回错误,不等待
- ✅ 保护资源 - 避免线程、连接等资源耗尽
- ✅ 防止级联故障 - 隔离故障服务
- ✅ 自动恢复 - 定期尝试恢复
3. Polly 熔断器实现
安装包:
bash
dotnet add package Polly
dotnet add package Microsoft.Extensions.Http.Polly基本使用:
csharp
using Polly;
using Polly.CircuitBreaker;
// 定义熔断策略
var circuitBreaker = Policy
.Handle<HttpRequestException>()
.OrResult<HttpResponseMessage>(r => !r.IsSuccessStatusCode)
.CircuitBreakerAsync(
handledEventsAllowedBeforeBreaking: 3, // 允许 3 次失败
durationOfBreak: TimeSpan.FromSeconds(10), // 熔断 10 秒
onBreak: (result, duration) =>
{
Console.WriteLine($"Circuit breaker opened. Duration: {duration}");
},
onReset: () =>
{
Console.WriteLine("Circuit breaker reset");
},
onHalfOpen: () =>
{
Console.WriteLine("Circuit breaker half-open");
}
);
// 使用
try
{
var response = await circuitBreaker.ExecuteAsync(async () =>
{
return await httpClient.GetAsync("https://api.example.com/data");
});
}
catch (BrokenCircuitException ex)
{
Console.WriteLine("Circuit breaker is open. Request rejected.");
}4. 高级熔断策略
高级熔断器(Advanced Circuit Breaker):
csharp
var advancedCircuitBreaker = Policy
.Handle<HttpRequestException>()
.AdvancedCircuitBreakerAsync(
failureThreshold: 0.5, // 失败率阈值 50%
samplingDuration: TimeSpan.FromSeconds(10), // 采样时间 10 秒
minimumThroughput: 8, // 最小吞吐量 8 次
durationOfBreak: TimeSpan.FromSeconds(30), // 熔断 30 秒
onBreak: (result, duration) => { },
onReset: () => { },
onHalfOpen: () => { }
);组合策略:
csharp
var policy = Policy
.Handle<HttpRequestException>()
.WaitAndRetryAsync(
retryCount: 3,
sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
onRetry: (outcome, timespan, retryCount, context) =>
{
Console.WriteLine($"Retry {retryCount} after {timespan}");
}
)
.WrapAsync(
Policy
.Handle<HttpRequestException>()
.CircuitBreakerAsync(
handledEventsAllowedBeforeBreaking: 5,
durationOfBreak: TimeSpan.FromSeconds(30)
)
);5. HttpClient 集成
csharp
// Program.cs
builder.Services.AddHttpClient<UserService>(client =>
{
client.BaseAddress = new Uri("https://api.example.com");
})
.AddPolicyHandler(GetRetryPolicy())
.AddPolicyHandler(GetCircuitBreakerPolicy());
// 重试策略
static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
return HttpPolicyExtensions
.HandleTransientHttpError()
.WaitAndRetryAsync(
retryCount: 3,
sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))
);
}
// 熔断策略
static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy()
{
return HttpPolicyExtensions
.HandleTransientHttpError()
.CircuitBreakerAsync(
handledEventsAllowedBeforeBreaking: 5,
durationOfBreak: TimeSpan.FromSeconds(30)
);
}二、限流(Rate Limiting)
1. 什么是限流?
限流是控制请求速率,防止系统过载的保护机制。
限流类型:
- 固定窗口 - 固定时间窗口内的请求数限制
- 滑动窗口 - 滑动时间窗口内的请求数限制
- 令牌桶 - 以固定速率生成令牌,请求消耗令牌
- 漏桶 - 以固定速率处理请求
2. ASP.NET Core 限流
安装包:
bash
dotnet add package Microsoft.AspNetCore.RateLimiting配置限流:
csharp
// Program.cs
builder.Services.AddRateLimiter(options =>
{
// 固定窗口限流
options.AddFixedWindowLimiter("fixed", opt =>
{
opt.Window = TimeSpan.FromSeconds(10);
opt.PermitLimit = 100;
opt.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
opt.QueueLimit = 10;
});
// 滑动窗口限流
options.AddSlidingWindowLimiter("sliding", opt =>
{
opt.Window = TimeSpan.FromSeconds(10);
opt.PermitLimit = 100;
opt.SegmentsPerWindow = 2;
opt.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
opt.QueueLimit = 10;
});
// 令牌桶限流
options.AddTokenBucketLimiter("token", opt =>
{
opt.TokenLimit = 100;
opt.ReplenishmentPeriod = TimeSpan.FromSeconds(10);
opt.TokensPerPeriod = 20;
opt.AutoReplenishment = true;
});
// 并发限流
options.AddConcurrencyLimiter("concurrency", opt =>
{
opt.PermitLimit = 10;
opt.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
opt.QueueLimit = 10;
});
});
var app = builder.Build();
app.UseRateLimiter();
app.Run();使用限流:
csharp
[EnableRateLimiting("fixed")]
[ApiController]
[Route("api/[controller]")]
public class UsersController : ControllerBase
{
[HttpGet]
public IActionResult GetUsers()
{
return Ok(users);
}
}
// 或者全局限流
app.UseRateLimiter();3. 基于策略的限流
csharp
builder.Services.AddRateLimiter(options =>
{
// 基于 IP 的限流
options.AddPolicy("ip", context =>
{
var ip = context.Connection.RemoteIpAddress?.ToString();
return RateLimitPartition.GetFixedWindowLimiter(
partitionKey: ip,
factory: partition => new FixedWindowRateLimiterOptions
{
Window = TimeSpan.FromSeconds(10),
PermitLimit = 100
}
);
});
// 基于用户的限流
options.AddPolicy("user", context =>
{
var userId = context.User?.FindFirst(ClaimTypes.NameIdentifier)?.Value;
return RateLimitPartition.GetFixedWindowLimiter(
partitionKey: userId ?? "anonymous",
factory: partition => new FixedWindowRateLimiterOptions
{
Window = TimeSpan.FromMinutes(1),
PermitLimit = 60
}
);
});
});
// 使用
[EnableRateLimiting("user")]
[HttpGet("profile")]
public IActionResult GetProfile()
{
return Ok(profile);
}4. Redis 分布式限流
安装包:
bash
dotnet add package AspNetCoreRateLimit.Redis配置:
csharp
builder.Services.AddMemoryCache();
builder.Services.Configure<IpRateLimitOptions>(options =>
{
options.EnableEndpointRateLimiting = true;
options.StackBlockedRequests = false;
options.HttpStatusCode = 429;
options.GeneralRules = new List<RateLimitRule>
{
new RateLimitRule
{
Endpoint = "*",
Period = "1m",
Limit = 100
}
};
});
builder.Services.AddSingleton<IIpPolicyStore, MemoryCacheIpPolicyStore>();
builder.Services.AddSingleton<IRateLimitCounterStore, MemoryCacheRateLimitCounterStore>();
builder.Services.AddSingleton<IRateLimitConfiguration, RateLimitConfiguration>();
builder.Services.AddSingleton<IProcessingStrategy, AsyncKeyLockProcessingStrategy>();
var app = builder.Build();
app.UseIpRateLimiting();三、降级(Fallback)
1. 什么是降级?
降级是当服务不可用时,返回默认值或缓存数据,保证系统基本可用。
2. Polly 降级实现
csharp
var fallbackPolicy = Policy
.Handle<HttpRequestException>()
.OrResult<HttpResponseMessage>(r => !r.IsSuccessStatusCode)
.FallbackAsync(
fallbackAction: async cancellationToken =>
{
// 返回默认值或缓存数据
return new HttpResponseMessage
{
StatusCode = HttpStatusCode.OK,
Content = new StringContent(JsonSerializer.Serialize(new { message = "Service unavailable, using cached data" }))
};
},
onFallbackAsync: async (result, context) =>
{
Console.WriteLine("Fallback executed");
}
);
var response = await fallbackPolicy.ExecuteAsync(async () =>
{
return await httpClient.GetAsync("https://api.example.com/data");
});3. 组合策略(重试 + 熔断 + 降级)
csharp
var policy = Policy
.Handle<HttpRequestException>()
.OrResult<HttpResponseMessage>(r => !r.IsSuccessStatusCode)
.WaitAndRetryAsync(
retryCount: 3,
sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))
)
.WrapAsync(
Policy
.Handle<HttpRequestException>()
.CircuitBreakerAsync(
handledEventsAllowedBeforeBreaking: 5,
durationOfBreak: TimeSpan.FromSeconds(30)
)
)
.WrapAsync(
Policy
.Handle<HttpRequestException>()
.FallbackAsync(
fallbackAction: async cancellationToken =>
{
return new HttpResponseMessage
{
StatusCode = HttpStatusCode.OK,
Content = new StringContent(JsonSerializer.Serialize(new { message = "Fallback data" }))
};
}
)
);四、重试(Retry)
1. 简单重试
csharp
var retryPolicy = Policy
.Handle<HttpRequestException>()
.RetryAsync(
retryCount: 3,
onRetry: (exception, retryCount) =>
{
Console.WriteLine($"Retry {retryCount}: {exception.Message}");
}
);
await retryPolicy.ExecuteAsync(async () =>
{
return await httpClient.GetAsync("https://api.example.com/data");
});2. 指数退避重试
csharp
var retryPolicy = Policy
.Handle<HttpRequestException>()
.WaitAndRetryAsync(
retryCount: 5,
sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
onRetry: (exception, timeSpan, retryCount, context) =>
{
Console.WriteLine($"Retry {retryCount} after {timeSpan.TotalSeconds} seconds");
}
);3. 带抖动重试
csharp
var jitter = new Random();
var retryPolicy = Policy
.Handle<HttpRequestException>()
.WaitAndRetryAsync(
retryCount: 5,
sleepDurationProvider: retryAttempt =>
{
var baseDelay = TimeSpan.FromSeconds(Math.Pow(2, retryAttempt));
var jitterDelay = TimeSpan.FromMilliseconds(jitter.Next(0, 1000));
return baseDelay.Add(jitterDelay);
}
);五、实际应用场景
1. 微服务调用保护
csharp
public class UserService
{
private readonly HttpClient _httpClient;
private readonly IAsyncPolicy<HttpResponseMessage> _policy;
public UserService(HttpClient httpClient)
{
_httpClient = httpClient;
_policy = Policy
.Handle<HttpRequestException>()
.OrResult<HttpResponseMessage>(r => !r.IsSuccessStatusCode)
.WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)))
.WrapAsync(
Policy
.Handle<HttpRequestException>()
.CircuitBreakerAsync(5, TimeSpan.FromSeconds(30))
)
.WrapAsync(
Policy
.Handle<HttpRequestException>()
.FallbackAsync(async cancellationToken =>
{
// 返回缓存数据
return new HttpResponseMessage
{
StatusCode = HttpStatusCode.OK,
Content = new StringContent(await GetCachedUsersAsync())
};
})
);
}
public async Task<List<User>> GetUsersAsync()
{
var response = await _policy.ExecuteAsync(async () =>
{
return await _httpClient.GetAsync("/api/users");
});
var content = await response.Content.ReadAsStringAsync();
return JsonSerializer.Deserialize<List<User>>(content);
}
}2. API 限流保护
csharp
[EnableRateLimiting("api")]
[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
[HttpPost]
public async Task<IActionResult> CreateOrder([FromBody] CreateOrderRequest request)
{
// 处理订单创建
return Ok(new { orderId = Guid.NewGuid() });
}
}
// 配置
builder.Services.AddRateLimiter(options =>
{
options.AddFixedWindowLimiter("api", opt =>
{
opt.Window = TimeSpan.FromMinutes(1);
opt.PermitLimit = 100;
opt.QueueLimit = 10;
});
});3. 分布式限流(Redis)
csharp
public class RedisRateLimiter
{
private readonly IDatabase _redis;
public RedisRateLimiter(IConnectionMultiplexer redis)
{
_redis = redis.GetDatabase();
}
public async Task<bool> TryAcquireAsync(string key, int limit, TimeSpan window)
{
var now = DateTimeOffset.UtcNow;
var windowStart = now - window;
var windowStartMs = windowStart.ToUnixTimeMilliseconds();
// 使用 Lua 脚本保证原子性
const string script = @"
redis.call('zremrangebyscore', KEYS[1], 0, ARGV[1])
local count = redis.call('zcard', KEYS[1])
if count < tonumber(ARGV[2]) then
redis.call('zadd', KEYS[1], ARGV[3], ARGV[3])
redis.call('expire', KEYS[1], ARGV[4])
return 1
else
return 0
end";
var result = await _redis.ScriptEvaluateAsync(
script,
new RedisKey[] { key },
new RedisValue[]
{
windowStartMs,
limit,
now.ToUnixTimeMilliseconds(),
(int)window.TotalSeconds
});
return (int)result == 1;
}
}六、常见面试题
Q1: 熔断器的三种状态?
- 关闭(Closed) - 正常状态,请求正常通过
- 打开(Open) - 故障状态,请求直接拒绝
- 半开(Half-Open) - 恢复状态,允许少量请求通过测试
Q2: 限流算法有哪些?
- 固定窗口 - 固定时间窗口内的请求数限制
- 滑动窗口 - 滑动时间窗口内的请求数限制
- 令牌桶 - 以固定速率生成令牌,请求消耗令牌
- 漏桶 - 以固定速率处理请求
Q3: 如何选择限流策略?
- 固定窗口 - 简单场景,允许突发流量
- 滑动窗口 - 需要平滑限流
- 令牌桶 - 允许突发,但限制平均速率
- 漏桶 - 严格控制速率,不允许突发
Q4: 熔断和降级的区别?
- 熔断 - 快速失败,直接拒绝请求
- 降级 - 返回默认值或缓存数据,保证基本可用
Q5: 如何实现分布式限流?
方案:
- Redis - 使用 Redis 存储限流计数
- 数据库 - 使用数据库存储限流计数
- 中间件 - 使用 API 网关统一限流
七、最佳实践
- ✅ 合理设置阈值 - 根据实际业务调整
- ✅ 监控和告警 - 及时发现限流和熔断
- ✅ 降级策略 - 准备降级方案
- ✅ 重试策略 - 使用指数退避
- ✅ 分布式限流 - 多实例环境使用 Redis
- ✅ 用户体验 - 返回友好的错误信息
- ✅ 日志记录 - 记录限流和熔断事件
- ✅ 测试验证 - 测试各种场景
- ❌ 不要过度限流 - 影响正常用户
- ❌ 不要忽略监控 - 及时发现问题