
这篇文章的内容都是基于我们GoFrame微服务电商项目的实践,感兴趣的朋友可以点击查看。
1. 引言
在电商系统中,库存管理是一个至关重要的环节,特别是在高并发场景下(如秒杀、限时抢购等),如何保证库存的准确性,避免超卖现象,是系统稳定性和用户体验的关键。本文档详细介绍库存超卖问题,分析现有的解决方案,并通过实践对比Redis Lua脚本和分布式锁两种方案在库存扣减场景下的优缺点,提供完整的实现代码和最佳实践建议。
库存超卖是指系统在高并发情况下,实际销售的商品数量超过了系统中记录的库存数量。这可能导致商家无法履行订单,造成用户投诉和经济损失,严重影响平台信誉。
在传统的库存管理逻辑中,通常包括以下步骤:
在高并发场景下,由于多个请求同时访问数据库或缓存,可能会出现以下情况:
请求A: 查询库存 -> 库存为10
请求B: 查询库存 -> 库存为10
请求A: 扣减库存 -> 库存变为9
请求B: 扣减库存 -> 库存变为8
如果库存只有1个商品,但同时有10个请求查询到库存为1,那么这10个请求都会执行扣减操作,导致库存变为负数。这种情况在秒杀等高并发场景下尤为常见。
在我们的电商系统中,传统的库存扣减实现虽然使用了数据库事务来保证原子性,但在高并发场景下,仍然可能出现超卖问题。主要原因是:
为了解决库存超卖问题,我们实现了两种常见的解决方案:
分布式锁是解决分布式系统中并发控制的一种常用机制。在库存扣减场景中,我们使用Redis实现分布式锁,确保同一时间只有一个请求能够扣减特定商品的库存。
使用Redis的SET命令的NX选项来实现分布式锁。当多个客户端同时设置同一个键时,只有一个能成功,这样就实现了互斥锁的效果。锁的value使用随机字符串生成,确保只有持有锁的客户端才能释放锁。
Redis Lua脚本可以在Redis服务器端原子执行多条命令,避免了在客户端和服务器之间多次通信带来的竞态条件。
编写一个Lua脚本,在脚本中完成以下操作:
shop-goframe-micro-service-refacotor/
├── app/
│ └── goods/
│ ├── utility/
│ │ └── stock/
│ │ ├── stock.go # 库存管理器接口定义
│ │ ├── distributed_lock.go # 基于分布式锁的实现
│ │ ├── redis_lua.go # 基于Lua脚本的实现
│ │ └── stock_test.go # 对比测试代码
└── doc/
└── 库存防超卖(Redis Lua+分布式锁对比实践).md # 本文档
对比项 | Redis分布式锁 | Redis Lua脚本 |
|---|---|---|
实现原理 | 使用SET NX命令实现锁机制,在库存操作前后加锁/解锁 | 利用Redis单线程执行特性,将库存检查和扣减封装在一个原子性Lua脚本中 |
原子性保证 | 操作由多个Redis命令组成,依赖分布式锁保证原子性 | 整个操作在Redis服务器端作为单个原子命令执行 |
网络开销 | 至少需要3次网络交互(加锁、操作、解锁) | 仅需要1次网络交互(执行脚本) |
复杂度 | 较高,需要处理锁的获取、释放、超时等逻辑 | 中等,主要是Lua脚本编写 |
代码量 | 较多,需要实现锁的管理逻辑 | 较少,主要是脚本定义和调用 |
性能指标 | Redis分布式锁 | Redis Lua脚本 |
|---|---|---|
响应时间 | 较慢,受网络延迟影响大,需要多次交互 | 较快,网络交互少,一次请求完成所有操作 |
吞吐量 | 较低,高并发下锁竞争激烈,会出现线程等待 | 较高,避免了锁竞争,充分利用Redis性能 |
资源占用 | Redis连接占用时间长,CPU利用率较高 | Redis连接占用时间短,CPU利用率较低 |
扩展性 | 随并发增加,性能下降明显,呈非线性下降 | 随并发增加,性能相对稳定,接近线性扩展 |
高并发下的稳定性 | 较差,容易出现锁竞争和饥饿现象 | 较好,性能稳定,适合秒杀等高并发场景 |
可靠性指标 | Redis分布式锁 | Redis Lua脚本 |
|---|---|---|
防超卖能力 | 强,但依赖锁的正确实现 | 强,由Redis单线程执行保证 |
死锁风险 | 存在,需要设置合理的超时时间 | 无,不使用锁机制,不存在死锁问题 |
异常恢复 | 依赖锁超时自动释放,可能有时间窗口 | 自动回滚,不影响其他操作,原子性更强 |
一致性保证 | 最终一致性,可能存在瞬时不一致 | 强一致性,操作原子性,保证数据一致性 |
故障隔离 | 单个操作失败可能影响其他操作获取锁 | 操作失败互不影响,故障隔离性好 |
场景类型 | Redis分布式锁 | Redis Lua脚本 |
|---|---|---|
高并发秒杀 | 不推荐,性能瓶颈明显 | 强烈推荐,性能最优,原子性强 |
复杂业务逻辑 | 推荐,可以在锁内执行复杂操作 | 不推荐,Lua脚本不易处理复杂逻辑 |
多资源协调 | 推荐,可以协调多个资源的操作 | 不适用,难以处理跨多个键的复杂操作 |
库存扣减 | 适用,但性能不如Lua脚本 | 最佳选择,性能和可靠性兼顾 |
需要事务性操作 | 适合,可以在锁内执行多个步骤 | 适合简单事务,复杂事务难以实现 |
服务资源有限 | 不适合,锁竞争会加剧资源占用 | 更适合,资源利用效率更高 |
分布式锁实现注意事项:
Lua脚本实现注意事项:
我们设计了统一的StockManager接口,使得两种实现可以无缝切换:
// StockManager 库存管理器接口
type StockManager interface {
// ReduceStock 扣减库存
// 返回值:是否成功扣减,错误信息
ReduceStock(ctx context.Context, goodsId uint32, count int) (bool, error)
// ReturnStock 返还库存
// 返回值:是否成功返还,错误信息
ReturnStock(ctx context.Context, goodsId uint32, count int) (bool, error)
// GetStock 获取当前库存
// 返回值:当前库存数量,错误信息
GetStock(ctx context.Context, goodsId uint32) (int, error)
// InitStock 初始化库存
// 返回值:是否成功初始化,错误信息
InitStock(ctx context.Context, goodsId uint32, count int) (bool, error)
}
分布式锁实现的核心是DistributedLockStockManager结构体,它包含以下关键方法:
核心实现代码如下:
// acquireLock 获取分布式锁
func (m *DistributedLockStockManager) acquireLock(ctx context.Context, goodsId uint32) (string, error) {
lockKey := m.getLockKey(goodsId)
lockValue := gconv.String(gtime.TimestampNano())
// 使用SET命令的NX选项实现分布式锁,同时设置过期时间
success, err := m.redisClient.Set(ctx, lockKey, lockValue, m.lockTimeout).Result()
if err != nil {
return"", gerror.Wrapf(err, "获取分布式锁失败,商品ID:%d", goodsId)
}
if success != "OK" {
return"", gerror.Newf("获取分布式锁失败,锁已被占用,商品ID:%d", goodsId)
}
return lockValue, nil
}
// releaseLock 释放分布式锁
func (m *DistributedLockStockManager) releaseLock(ctx context.Context, goodsId uint32, lockValue string) error {
lockKey := m.getLockKey(goodsId)
// 使用Lua脚本确保原子性释放锁,避免误删其他客户端的锁
releaseLuaScript := `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`
_, err := m.redisClient.Eval(ctx, releaseLuaScript, []string{lockKey}, lockValue)
if err != nil {
return gerror.Wrapf(err, "释放分布式锁失败,商品ID:%d", goodsId)
}
returnnil
}
// ReduceStock 扣减库存(使用分布式锁)
func (m *DistributedLockStockManager) ReduceStock(ctx context.Context, goodsId uint32, count int) (bool, error) {
// 参数校验
if count < 1 {
returnfalse, gerror.New("扣减数量必须大于0")
}
// 获取分布式锁
lockValue, err := m.acquireLock(ctx, goodsId)
if err != nil {
returnfalse, err
}
// 确保释放锁
deferfunc() {
err := m.releaseLock(ctx, goodsId, lockValue)
if err != nil {
g.Log().Errorf(ctx, "释放分布式锁失败: %v", err)
}
}()
// 获取当前库存并扣减
stockKey := m.getStockKey(goodsId)
currentStockStr, err := m.redisClient.Get(ctx, stockKey)
if err != nil {
returnfalse, gerror.Wrapf(err, "获取当前库存失败,商品ID:%d", goodsId)
}
// 解析库存并检查是否充足
currentStock := 0
if currentStockStr.String() != "" {
currentStock = gconv.Int(currentStockStr.String())
}
if currentStock < count {
returnfalse, gerror.Newf("库存不足,商品ID:%d,当前库存:%d,请求数量:%d", goodsId, currentStock, count)
}
// 扣减库存
newStock := currentStock - count
_, err = m.redisClient.Set(ctx, stockKey, newStock, 0) // 0表示永不过期
if err != nil {
returnfalse, gerror.Wrapf(err, "更新库存失败,商品ID:%d", goodsId)
}
returntrue, nil
}
Lua脚本实现的核心是RedisLuaStockManager结构体,它包含以下关键部分:
核心实现代码如下:
// 定义Lua脚本
const (
// reduceStockLuaScript 库存扣减Lua脚本
reduceStockLuaScript = `
-- 获取当前库存
local currentStock = redis.call('get', KEYS[1])
if currentStock == false then
currentStock = 0
else
currentStock = tonumber(currentStock)
end
-- 检查库存是否足够
if currentStock >= tonumber(ARGV[1]) then
-- 扣减库存
redis.call('set', KEYS[1], currentStock - tonumber(ARGV[1]))
return 1 -- 扣减成功
else
return 0 -- 库存不足
end
`
// returnStockLuaScript 库存返还Lua脚本
returnStockLuaScript = `
-- 获取当前库存
local currentStock = redis.call('get', KEYS[1])
if currentStock == false then
currentStock = 0
else
currentStock = tonumber(currentStock)
end
-- 返还库存
redis.call('set', KEYS[1], currentStock + tonumber(ARGV[1]))
return 1 -- 返还成功
`
// initStockLuaScript 库存初始化Lua脚本
initStockLuaScript = `
-- 设置初始库存
redis.call('set', KEYS[1], tonumber(ARGV[1]))
return 1 -- 初始化成功
`
)
// ReduceStock 扣减库存(使用Lua脚本)
func (m *RedisLuaStockManager) ReduceStock(ctx context.Context, goodsId uint32, count int) (bool, error) {
// 参数校验
if count < 1 {
returnfalse, gerror.New("扣减数量必须大于0")
}
// 获取库存键
stockKey := m.getStockKey(goodsId)
// 执行Lua脚本
result, err := m.redisClient.Eval(ctx, reduceStockLuaScript, []string{stockKey}, count)
if err != nil {
returnfalse, gerror.Wrapf(err, "执行库存扣减Lua脚本失败,商品ID:%d,请求数量:%d", goodsId, count)
}
// 解析结果
resultInt, ok := result.(int64)
if !ok {
returnfalse, gerror.Newf("无法解析Lua脚本结果,商品ID:%d,请求数量:%d", goodsId, count)
}
if resultInt == 0 {
// 获取当前库存,用于错误消息
currentStock, _ := m.GetStock(ctx, goodsId)
returnfalse, gerror.Newf("库存不足,商品ID:%d,当前库存:%d,请求数量:%d", goodsId, currentStock, count)
}
return resultInt == 1, nil
}
我们实现了全面的测试用例,模拟高并发场景下两种方案的表现:
核心测试代码如下:
// TestStockManagerComparison 测试两种库存管理方案的对比
func TestStockManagerComparison(t *testing.T) {
// 测试分布式锁方案
t.Run("分布式锁方案", func(t *testing.T) {
// 初始化测试环境
ctx := context.Background()
goodsId := uint32(1)
initialStock := 100
concurrentRequests := 200
requestCount := 1
// 初始化库存
_, err := distributedLockManager.InitStock(ctx, goodsId, initialStock)
require.NoError(t, err)
// 创建结果通道和等待组
successChan := make(chanbool, concurrentRequests)
errChan := make(chan error, concurrentRequests)
var wg sync.WaitGroup
// 记录开始时间
startTime := time.Now()
// 启动并发请求
for i := 0; i < concurrentRequests; i++ {
wg.Add(1)
gofunc() {
defer wg.Done()
success, err := distributedLockManager.ReduceStock(ctx, goodsId, requestCount)
successChan <- success
if err != nil {
errChan <- err
} else {
errChan <- nil
}
}()
}
// 等待所有请求完成
wg.Wait()
close(successChan)
close(errChan)
// 计算执行时间
executionTime := time.Since(startTime)
// 统计结果
successCount := 0
errorCount := 0
for success := range successChan {
if success {
successCount++
}
}
for err := range errChan {
if err != nil {
errorCount++
}
}
// 获取最终库存
finalStock, err := distributedLockManager.GetStock(ctx, goodsId)
require.NoError(t, err)
// 验证结果
expectedSuccessCount := initialStock / requestCount
expectedFinalStock := initialStock - expectedSuccessCount*requestCount
t.Logf("执行时间: %v", executionTime)
t.Logf("成功次数: %d", successCount)
t.Logf("失败次数: %d", errorCount)
t.Logf("最终库存: %d", finalStock)
t.Logf("期望成功次数: %d", expectedSuccessCount)
t.Logf("期望最终库存: %d", expectedFinalStock)
// 验证库存是否正确
require.Equal(t, expectedFinalStock, finalStock)
require.Equal(t, expectedSuccessCount, successCount)
})
// 清理测试数据
ctx := context.Background()
goodsId := uint32(1)
// 清理Redis中的测试数据
_, _ = redisClient.Del(ctx, fmt.Sprintf("stock:%d", goodsId))
// 测试Redis Lua脚本方案
t.Run("Redis Lua脚本方案", func(t *testing.T) {
// 初始化测试环境
ctx := context.Background()
goodsId := uint32(1)
initialStock := 100
concurrentRequests := 200
requestCount := 1
// 初始化库存
_, err := redisLuaManager.InitStock(ctx, goodsId, initialStock)
require.NoError(t, err)
// 创建结果通道和等待组
successChan := make(chanbool, concurrentRequests)
errChan := make(chan error, concurrentRequests)
var wg sync.WaitGroup
// 记录开始时间
startTime := time.Now()
// 启动并发请求
for i := 0; i < concurrentRequests; i++ {
wg.Add(1)
gofunc() {
defer wg.Done()
success, err := redisLuaManager.ReduceStock(ctx, goodsId, requestCount)
successChan <- success
if err != nil {
errChan <- err
} else {
errChan <- nil
}
}()
}
// 等待所有请求完成
wg.Wait()
close(successChan)
close(errChan)
// 计算执行时间
executionTime := time.Since(startTime)
// 统计结果
successCount := 0
errorCount := 0
for success := range successChan {
if success {
successCount++
}
}
for err := range errChan {
if err != nil {
errorCount++
}
}
// 获取最终库存
finalStock, err := redisLuaManager.GetStock(ctx, goodsId)
require.NoError(t, err)
// 验证结果
expectedSuccessCount := initialStock / requestCount
expectedFinalStock := initialStock - expectedSuccessCount*requestCount
t.Logf("执行时间: %v", executionTime)
t.Logf("成功次数: %d", successCount)
t.Logf("失败次数: %d", errorCount)
t.Logf("最终库存: %d", finalStock)
t.Logf("期望成功次数: %d", expectedSuccessCount)
t.Logf("期望最终库存: %d", expectedFinalStock)
// 验证库存是否正确
require.Equal(t, expectedFinalStock, finalStock)
require.Equal(t, expectedSuccessCount, successCount)
})
}
通过对Redis分布式锁和Redis Lua脚本两种方案的详细实现和对比,我们可以得出以下结论: