
这篇文章的内容都是基于我们GoFrame微服务电商项目的实践,感兴趣的朋友可以点击查看

// 秒杀商品信息请求结构
type FlashSaleGoodsListReq struct {
ActivityId uint32 `json:"activity_id" v:"required"` // 活动ID
PageNum int `json:"page_num" v:"min:1"` // 页码
PageSize int `json:"page_size" v:"min:1,max:100"` // 每页数量
StartTime int64 `json:"start_time"` // 开始时间过滤
EndTime int64 `json:"end_time"` // 结束时间过滤
}
// 秒杀商品列表响应结构
type FlashSaleGoodsListRes struct {
Total int64 `json:"total"` // 总数量
List []*FlashSaleGoodsInfo `json:"list"` // 商品列表
}
// 秒杀商品详情请求/响应结构
type FlashSaleGoodsDetailReq struct {
GoodsId uint32 `json:"goods_id" v:"required"` // 商品ID
ActivityId uint32 `json:"activity_id" v:"required"` // 活动ID
}
type FlashSaleGoodsDetailRes struct {
GoodsInfo *FlashSaleGoodsInfo `json:"goods_info"` // 商品信息
StockInfo *StockInfo `json:"stock_info"` // 库存信息
ActivityInfo *ActivityInfo `json:"activity_info"` // 活动信息
RemainSeconds int64 `json:"remain_seconds"` // 距离开始/结束的秒数
}
// 秒杀商品信息服务接口
type FlashSaleGoodsService interface {
GetFlashSaleGoodsList(ctx context.Context, req *FlashSaleGoodsListReq) (*FlashSaleGoodsListRes, error)
GetFlashSaleGoodsDetail(ctx context.Context, req *FlashSaleGoodsDetailReq) (*FlashSaleGoodsDetailRes, error)
PreheatFlashSaleGoods(ctx context.Context, activityId uint32) error
}
// 创建秒杀订单请求/响应结构
type CreateFlashSaleOrderReq struct {
GoodsId uint32 `json:"goods_id" v:"required"` // 商品ID
ActivityId uint32 `json:"activity_id" v:"required"` // 活动ID
UserId uint32 `json:"user_id" v:"required"` // 用户ID
Count int `json:"count" v:"min:1,max:10"` // 购买数量
}
type CreateFlashSaleOrderRes struct {
Success bool `json:"success"` // 是否成功
OrderNo string `json:"order_no"` // 订单号(如果成功)
Message string `json:"message"` // 提示信息
ResultId string `json:"result_id"` // 结果查询ID
Status int `json:"status"` // 状态码:0-处理中,1-成功,2-失败
}
// 查询秒杀结果请求/响应结构
type GetFlashSaleResultReq struct {
ResultId string `json:"result_id" v:"required"` // 结果查询ID
UserId uint32 `json:"user_id" v:"required"` // 用户ID
}
type GetFlashSaleResultRes struct {
Status int `json:"status"` // 状态码:0-处理中,1-成功,2-失败
Message string `json:"message"` // 提示信息
OrderNo string `json:"order_no"` // 订单号(如果成功)
GoodsId uint32 `json:"goods_id"` // 商品ID
PayAmount int64 `json:"pay_amount"` // 支付金额
}
// 秒杀操作服务接口
type FlashSaleService interface {
CreateFlashSaleOrder(ctx context.Context, req *CreateFlashSaleOrderReq) (*CreateFlashSaleOrderRes, error)
GetFlashSaleResult(ctx context.Context, req *GetFlashSaleResultReq) (*GetFlashSaleResultRes, error)
ProcessFlashSaleOrder(ctx context.Context, orderInfo *FlashSaleOrderInfo) error
}
// 库存信息结构
type StockInfo struct {
GoodsId uint32 `json:"goods_id"` // 商品ID
TotalStock int `json:"total_stock"` // 总库存
AvailableStock int `json:"available_stock"` // 可用库存
LockedStock int `json:"locked_stock"` // 锁定库存
}
// 库存服务秒杀扩展接口
type FlashSaleStockService interface {
ReduceFlashSaleStock(ctx context.Context, goodsId uint32, userId uint32, count int) (bool, error)
InitFlashSaleStock(ctx context.Context, goodsId uint32, count int) (bool, error)
GetFlashSaleStock(ctx context.Context, goodsId uint32) (*StockInfo, error)
LockFlashSaleStock(ctx context.Context, goodsId uint32, orderNo string, count int) (bool, error)
ConfirmFlashSaleStock(ctx context.Context, goodsId uint32, orderNo string) (bool, error)
UnlockFlashSaleStock(ctx context.Context, goodsId uint32, orderNo string) (bool, error)
}
// 集成到现有库存管理
func (s *StockManagerImpl) ReduceFlashSaleStock(ctx context.Context, goodsId uint32, userId uint32, count int) (bool, error) {
// 复用现有库存扣减逻辑,使用专门的秒杀库存键
// 结合Redis Lua脚本确保原子性操作
return s.luaStockManager.ReduceFlashSaleStock(ctx, goodsId, userId, count)
}
// 秒杀消息结构
type FlashSaleMessage struct {
OrderNo string `json:"order_no"` // 订单号
GoodsId uint32 `json:"goods_id"` // 商品ID
ActivityId uint32 `json:"activity_id"` // 活动ID
UserId uint32 `json:"user_id"` // 用户ID
Count int `json:"count"` // 购买数量
Amount int64 `json:"amount"` // 金额
CreateTime int64 `json:"create_time"` // 创建时间
}
// 消息队列服务接口
type MessageQueueService interface {
PublishFlashSaleMessage(ctx context.Context, msg *FlashSaleMessage) error
ConsumeFlashSaleMessage(ctx context.Context, handler func(ctx context.Context, msg *FlashSaleMessage) error) error
}
// app/gateway-h5/internal/middleware/sentinel.go
package middleware
import (
"context"
"github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/gogf/gf/v2/net/ghttp"
"net/http"
"time"
)
// SentinelMiddleware Sentinel中间件
func SentinelMiddleware() ghttp.HandlerFunc {
return func(r *ghttp.Request) {
resourceName := r.RequestURI // 请求路径作为资源名
ctx := context.Background()
// 执行限流控制
entry, blockErr := api.Entry(
resourceName,
api.WithResourceType(base.ResTypeWeb),
api.WithTrafficType(base.Inbound),
)
if blockErr != nil {
// 被限流或熔断
r.Response.WriteJsonExit(map[string]interface{}{
"code": 429,
"message": "当前请求人数过多,请稍后重试",
"data": nil,
})
return
}
defer entry.Exit()
// 继续处理请求
r.Middleware.Next()
}
}
// app/flash-sale/internal/service/sentinel.go
package service
import (
"context"
"github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/gogf/gf/v2/errors/gerror"
)
// SentinelService Sentinel服务封装
type SentinelService struct{}
func NewSentinelService() *SentinelService {
return &SentinelService{}
}
// DoWithSentinel 使用Sentinel包装业务逻辑
func (s *SentinelService) DoWithSentinel(ctx context.Context, resourceName string, fn func(ctx context.Context) error) error {
entry, blockErr := api.Entry(
resourceName,
api.WithResourceType(base.ResTypeCommon),
api.WithTrafficType(base.Inbound),
)
if blockErr != nil {
return gerror.New("当前请求人数过多,请稍后重试")
}
defer entry.Exit()
return fn(ctx)
}
// app/gateway-h5/internal/init/sentinel.go
package init
import (
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/core/rule"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/alibaba/sentinel-golang/core/hotspot"
"github.com/alibaba/sentinel-golang/ext/datasource"
"time"
)
// InitSentinel 初始化Sentinel
func InitSentinel() error {
// 初始化Sentinel
if err := api.InitDefault(); err != nil {
return err
}
// 加载限流、熔断、热点参数规则
loadFlowRules()
loadCircuitBreakerRules()
loadHotspotRules()
return nil
}
// 全局限流规则
func loadFlowRules() {
rules := []*flow.Rule{
// 全局QPS限流
{
Resource: "global",
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
Threshold: 10000, // 全局QPS上限
StatIntervalInMs: 1000,
},
// 秒杀接口限流
{
Resource: "/api/v1/flash-sale/create",
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
Threshold: 5000, // 秒杀接口QPS上限
StatIntervalInMs: 1000,
},
// 秒杀商品详情接口限流(预热模式)
{
Resource: "/api/v1/flash-sale/goods/detail",
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.WarmUp,
Threshold: 20000, // 详情接口QPS上限
WarmUpPeriodSec: 10, // 10秒预热
StatIntervalInMs: 1000,
},
}
_, err := flow.LoadRules(rules)
if err != nil {
// 记录错误日志
}
}
// 用户级别限流 - 自定义资源名
func createUserResourceName(userId uint32, baseResource string) string {
return fmt.Sprintf("%s:user:%d", baseResource, userId)
}
// 可在运行时动态为活跃用户添加限流规则,或结合热点参数限流实现
// 热点参数限流规则
func loadHotspotRules() {
rules := []*hotspot.Rule{
// 对秒杀商品ID进行热点参数限流
{
Resource: "/api/v1/flash-sale/create",
ParamIdx: 0, // 商品ID为第一个参数
MetricType: hotspot.QPS,
ControlBehavior: flow.Reject,
BurstCount: 10,
ParamFlowItems: []*hotspot.ParamFlowItem{
{
Object: "1001", // 热门商品ID
ClassType: "string",
Count: 100, // 该商品QPS限制
},
},
},
// 对用户ID限流(防刷单)
{
Resource: "/api/v1/flash-sale/create",
ParamIdx: 1, // 用户ID为第二个参数
MetricType: hotspot.QPS,
ControlBehavior: flow.Reject,
Count: 5, // 单个用户每秒最多5个请求
},
}
_, err := hotspot.LoadRules(rules)
if err != nil {
// 记录错误日志
}
}
// 错误率熔断规则
func loadCircuitBreakerRules() {
rules := []*circuitbreaker.Rule{
// 基于错误率的熔断
{
Resource: "flash_sale_create_order",
Strategy: circuitbreaker.ErrorRatio,
MinRequestAmount: 100, // 最小请求数
Threshold: 0.5, // 错误率阈值50%
RecoveryTimeoutSec: 5, // 熔断恢复时间5秒
StatIntervalInMs: 1000, // 统计窗口
StatSlidingWindowBucketCount: 10, // 滑动窗口桶数量
},
// 基于响应时间的熔断
{
Resource: "flash_sale_create_order",
Strategy: circuitbreaker.SlowRequestRatio,
MinRequestAmount: 100,
Threshold: 0.5, // 慢调用比例阈值50%
SlowRatioThreshold: 0.5,
MaxAllowedRtMs: 500, // 最大允许响应时间500ms
RecoveryTimeoutSec: 5,
StatIntervalInMs: 1000,
StatSlidingWindowBucketCount: 10,
},
}
_, err := circuitbreaker.LoadRules(rules)
if err != nil {
// 记录错误日志
}
}
// 在秒杀服务中处理熔断降级
func (s *FlashSaleServiceImpl) CreateFlashSaleOrder(ctx context.Context, req *CreateFlashSaleOrderReq) (*CreateFlashSaleOrderRes, error) {
var result *CreateFlashSaleOrderRes
var err error
// 使用Sentinel包装业务逻辑
sentinelErr := s.sentinelService.DoWithSentinel(ctx, "flash_sale_create_order", func(ctx context.Context) error {
result, err = s.doCreateFlashSaleOrder(ctx, req)
return err
})
if sentinelErr != nil {
return &CreateFlashSaleOrderRes{
Success: false,
Message: "当前请求人数过多,请稍后重试",
Status: 2,
}, nil
}
return result, err
}
// 配置动态数据源(以etcd为例)
func setupDynamicDataSource() error {
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
return err
}
// 流控规则数据源
flowDataSource := datasource.NewEtcdDataSource(
client, "sentinel/rules/flow", datasource.FlowRuleJsonParser,
)
flow.RegisterFlowDataSource(flowDataSource)
// 熔断规则数据源
cbDataSource := datasource.NewEtcdDataSource(
client, "sentinel/rules/circuitbreaker", datasource.CircuitbreakerRuleJsonParser,
)
circuitbreaker.RegisterCircuitbreakerDataSource(cbDataSource)
return nil
}
// 配置监控和告警
func setupMonitoring() error {
config := &dashboard.Config{
CollectorAddr: "localhost:8719", // Sentinel Dashboard地址
HeartbeatIntervalMs: 10000, // 心跳间隔
}
return dashboard.Start(config)
}
// app/flash-sale/internal/service/token_bucket.go
package service
import (
"context"
"fmt"
"time"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/util/gconv"
)
// TokenBucketService 令牌桶服务
type TokenBucketService struct {
redisClient *redis.Client
}
func NewTokenBucketService(redisClient *redis.Client) *TokenBucketService {
return &TokenBucketService{redisClient: redisClient}
}
// 尝试获取令牌
func (s *TokenBucketService) TryAcquire(ctx context.Context, key string, rate float64, burst int, tokens int) (bool, error) {
result, err := s.redisClient.Eval(ctx, tokenBucketLuaScript,
[]string{key},
rate, burst, tokens, time.Now().UnixNano()/1000000,
).Result()
if err != nil {
return false, gerror.Wrap(err, "尝试获取令牌失败")
}
return gconv.Bool(result), nil
}
// 令牌桶键定义
const (
TokenBucketKeyPrefix = "flash_sale:token_bucket:"
GlobalTokenBucketKey = "flash_sale:token_bucket:global"
GoodsTokenBucketKey = "flash_sale:token_bucket:goods:%d"
UserTokenBucketKey = "flash_sale:token_bucket:user:%d"
ActivityTokenBucketKey = "flash_sale:token_bucket:activity:%d"
)
// 生成各类令牌桶键
func GetGoodsTokenBucketKey(goodsId uint32) string {
return fmt.Sprintf(GoodsTokenBucketKey, goodsId)
}
// 类似实现GetUserTokenBucketKey、GetActivityTokenBucketKey
// 令牌桶Lua脚本(原子性操作)
const tokenBucketLuaScript = `
local key = KEYS[1]
local rate = tonumber(ARGV[1]) -- 令牌生成速率(个/毫秒)
local burst = tonumber(ARGV[2]) -- 桶容量
local tokens = tonumber(ARGV[3]) -- 请求令牌数
local now = tonumber(ARGV[4]) -- 当前时间戳(毫秒)
-- 获取当前桶状态
local bucket = redis.call('hmget', key, 'last_refill_time', 'available_tokens')
local lastRefillTime = tonumber(bucket[1] or now)
local availableTokens = tonumber(bucket[2] or burst)
-- 计算补充的令牌数
local timeElapsed = now - lastRefillTime
local newTokens = timeElapsed * rate
availableTokens = math.min(burst, availableTokens + newTokens)
-- 检查并扣除令牌
if availableTokens >= tokens then
availableTokens = availableTokens - tokens
redis.call('hmset', key, 'last_refill_time', now, 'available_tokens', availableTokens)
redis.call('expire', key, 86400)
return 1
else
return 0
end
`
// MultiLevelTokenBucketService 多层令牌桶服务
type MultiLevelTokenBucketService struct {
tokenBucketService *TokenBucketService
}
func NewMultiLevelTokenBucketService(tokenBucketService *TokenBucketService) *MultiLevelTokenBucketService {
return &MultiLevelTokenBucketService{tokenBucketService: tokenBucketService}
}
// 尝试获取秒杀令牌(多层检查)
func (s *MultiLevelTokenBucketService) TryAcquireFlashSaleToken(ctx context.Context, goodsId, userId, activityId uint32) (bool, error) {
// 1. 全局令牌桶检查
if ok, _ := s.tokenBucketService.TryAcquire(ctx, GlobalTokenBucketKey, 1000.0/1000, 1000, 1); !ok {
return false, nil
}
// 2. 活动令牌桶检查
activityKey := GetActivityTokenBucketKey(activityId)
if ok, _ := s.tokenBucketService.TryAcquire(ctx, activityKey, 500.0/1000, 500, 1); !ok {
return false, nil
}
// 3. 商品令牌桶检查
goodsKey := GetGoodsTokenBucketKey(goodsId)
if ok, _ := s.tokenBucketService.TryAcquire(ctx, goodsKey, 100.0/1000, 100, 1); !ok {
return false, nil
}
// 4. 用户令牌桶检查(防刷单)
userKey := GetUserTokenBucketKey(userId)
if ok, _ := s.tokenBucketService.TryAcquire(ctx, userKey, 5.0/1000, 10, 1); !ok {
return false, nil
}
return true, nil
}
// AdaptiveTokenBucketManager 自适应令牌桶管理器
type AdaptiveTokenBucketManager struct {
redisClient *redis.Client
mu sync.RWMutex
configs map[string]*TokenBucketConfig
monitor *SystemMonitor
}
// TokenBucketConfig 令牌桶配置
type TokenBucketConfig struct {
Key string `json:"key"`
BaseRate float64 `json:"base_rate"` // 基础速率
MaxRate float64 `json:"max_rate"` // 最大速率
MinRate float64 `json:"min_rate"` // 最小速率
Burst int `json:"burst"` // 桶容量
AdjustStep float64 `json:"adjust_step"` // 调整步长
}
// 启动自适应调整
func (m *AdaptiveTokenBucketManager) StartAdaptiveAdjustment(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
m.adjustRates(ctx)
}
}
}
// 调整令牌生成速率
func (m *AdaptiveTokenBucketManager) adjustRates(ctx context.Context) {
load, err := m.monitor.GetSystemLoad(ctx)
if err != nil {
// 记录错误日志
return
}
m.mu.RLock()
defer m.mu.RUnlock()
for key, config := range m.configs {
var newRate float64
if load.CPUUsage > 0.8 {
// 高负载,降低速率
newRate = math.Max(config.MinRate, config.BaseRate * 0.9)
} else if load.CPUUsage < 0.5 {
// 低负载,提高速率
newRate = math.Min(config.MaxRate, config.BaseRate * 1.1)
} else {
newRate = config.BaseRate
}
config.BaseRate = newRate
// 保存配置到Redis
if err := m.saveConfigToRedis(ctx, key, config); err != nil {
// 记录错误日志
}
}
}
// 在秒杀服务中集成令牌桶
func (s *FlashSaleServiceImpl) CreateFlashSaleOrder(ctx context.Context, req *CreateFlashSaleOrderReq) (*CreateFlashSaleOrderRes, error) {
// 1. 令牌桶削峰检查
success, err := s.tokenBucketService.TryAcquireFlashSaleToken(
ctx, req.GoodsId, req.UserId, req.ActivityId,
)
if err != nil {
return nil, gerror.Wrap(err, "令牌桶检查失败")
}
if !success {
// 未获取令牌,返回排队信息
return &CreateFlashSaleOrderRes{
Success: false,
Message: "当前请求人数过多,请排队等待",
ResultId: generateResultId(req.UserId, req.GoodsId),
Status: 0, // 处理中
}, nil
}
// 2. 后续业务逻辑(库存检查、订单创建等)
// ...
return result, nil
}
// WarmupTokenBucketService 预热期令牌桶服务
type WarmupTokenBucketService struct {
tokenBucketService *TokenBucketService
adaptiveTokenManager *AdaptiveTokenBucketManager
}
// 开始预热
func (s *WarmupTokenBucketService) StartWarmup(ctx context.Context, activityId uint32, startTime time.Time, duration time.Duration) {
now := time.Now()
if now.After(startTime) {
return // 活动已开始,无需预热
}
// 计算预热间隔和步骤
warmupDuration := startTime.Sub(now)
if warmupDuration < time.Minute {
warmupDuration = time.Minute // 最小预热1分钟
}
steps := int(warmupDuration.Seconds() / 10) // 每10秒调整一次
if steps < 1 {
steps = 1
}
// 初始速率与目标速率
initialRate := 10.0 / 1000 // 10个/秒
targetRate := 100.0 / 1000 // 100个/秒
rateIncrement := (targetRate - initialRate) / float64(steps)
// 启动预热协程
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
currentRate := initialRate
for i := 0; i < steps; i++ {
select {
case <-ctx.Done():
return
case <-ticker.C:
activityKey := GetActivityTokenBucketKey(activityId)
// 更新令牌桶配置
s.adaptiveTokenManager.RegisterTokenBucket(&TokenBucketConfig{
Key: activityKey,
BaseRate: currentRate,
MaxRate: targetRate,
MinRate: initialRate,
Burst: int(currentRate * 2000), // 2秒突发容量
})
currentRate += rateIncrement // 提升速率
}
}
}()
}
秒杀场景下Redis缓存需满足高频读写、原子操作、防超卖等需求,核心数据结构按业务场景分类设计,避免冗余存储。
// 秒杀核心缓存键定义(按业务维度分类)
const (
// 商品维度:Hash存储商品基本信息,String存储库存/已售量
FlashSaleGoodsInfoKey = "flash_sale:goods:info:%d" // %d=商品ID,存储商品名称、价格、活动规则
FlashSaleGoodsStockKey = "flash_sale:goods:stock:%d" // %d=商品ID,存储实时可用库存
FlashSaleGoodsSoldKey = "flash_sale:goods:sold:%d" // %d=商品ID,存储已售数量(用于最终核对)
// 用户维度:Set防重复购买,String存储秒杀结果
FlashSaleUserBuyKey = "flash_sale:user:buy:%d:%d" // %d=用户ID:%d=商品ID,存储购买记录(防重复)
FlashSaleResultKey = "flash_sale:result:%s" // %s=结果ID,存储秒杀状态(0-处理中/1-成功/2-失败)
// 活动维度:Set存储活动商品列表,ZSet排序待预热商品
FlashSaleActivityGoods = "flash_sale:activity:goods:%d" // %d=活动ID,存储参与活动的商品ID集合
FlashSalePreheatQueue = "flash_sale:preheat:queue" // 待预热商品队列(按活动开始时间排序)
)
// 商品信息缓存结构体(与数据库字段对齐,精简冗余字段)
type FlashSaleGoodsCache struct {
GoodsId uint32 `json:"goods_id"` // 商品ID
ActivityId uint32 `json:"activity_id"` // 活动ID
Price int64 `json:"price"` // 秒杀价(分)
MaxBuy int `json:"max_buy"` // 单用户限购数
StartTime int64 `json:"start_time"` // 活动开始时间戳
EndTime int64 `json:"end_time"` // 活动结束时间戳
}
基于Redis实现分布式锁,确保库存操作的原子性,避免并发场景下的超卖问题,同时增加锁自动释放机制防止死锁。
// RedisLock 分布式锁核心实现
type RedisLock struct {
redisClient *redis.Client
key string // 锁键
value string // 唯一标识(防止误释放)
expiry time.Duration // 过期时间
}
// NewRedisLock 创建锁实例(秒杀场景建议过期时间3-5秒)
func NewRedisLock(redisClient *redis.Client, goodsId uint32) *RedisLock {
lockKey := fmt.Sprintf("flash_sale:lock:stock:%d", goodsId)
return &RedisLock{
redisClient: redisClient,
key: lockKey,
value: util.RandomString(16), // 生成16位随机唯一值
expiry: 3 * time.Second,
}
}
// Lock 尝试获取锁(支持重试,秒杀场景重试次数建议≤3)
func (l *RedisLock) Lock(ctx context.Context, retry int) (bool, error) {
for i := 0; i < retry; i++ {
// SET NX EX 原子操作:不存在则设置,同时指定过期时间
success, err := l.redisClient.SetNX(ctx, l.key, l.value, l.expiry).Result()
if err != nil {
return false, err
}
if success {
return true, nil
}
time.Sleep(100 * time.Millisecond) // 重试间隔100ms,减轻Redis压力
}
return false, nil
}
// Unlock 安全释放锁(Lua脚本确保原子性)
func (l *RedisLock) Unlock(ctx context.Context) error {
unlockScript := `
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
end
return 0
`
_, err := l.redisClient.Eval(ctx, unlockScript, []string{l.key}, l.value).Result()
return err
}
秒杀开始前30分钟触发预热,将活动商品数据加载至Redis,避免活动初期缓存穿透。通过分布式锁防止重复预热,支持按活动批量处理。
// PreheatService 缓存预热服务
type PreheatService struct {
goodsRepo *repository.FlashSaleGoodsRepo
redisClient *redis.Client
logger *glog.Logger
}
// PreheatByActivity 按活动ID批量预热商品
func (s *PreheatService) PreheatByActivity(ctx context.Context, activityId uint32) error {
// 1. 获取预热锁,防止多实例重复执行
lock := NewRedisLock(s.redisClient, activityId)
lockSuccess, err := lock.Lock(ctx, 2)
if err != nil {
s.logger.Error(ctx, "预热锁获取失败", g.Map{"activity_id": activityId, "err": err})
return err
}
if !lockSuccess {
s.logger.Info(ctx, "活动已在预热中", g.Map{"activity_id": activityId})
return nil
}
defer lock.Unlock(ctx)
// 2. 查询活动下所有商品(分页查询避免数据量过大)
goodsList, err := s.goodsRepo.GetByActivityId(ctx, activityId, 1, 1000)
if err != nil {
return err
}
// 3. 批量缓存商品信息与库存
pipe := s.redisClient.Pipeline()
for _, goods := range goodsList {
// 缓存商品基本信息
infoKey := fmt.Sprintf(FlashSaleGoodsInfoKey, goods.GoodsId)
pipe.HSet(ctx, infoKey, gconv.Map(&FlashSaleGoodsCache{
GoodsId: goods.GoodsId,
ActivityId: goods.ActivityId,
Price: goods.FlashPrice,
MaxBuy: goods.MaxBuy,
StartTime: goods.StartTime,
EndTime: goods.EndTime,
}))
// 设置过期时间(活动结束后24小时)
expiry := time.Duration(goods.EndTime - time.Now().Unix() + 86400) * time.Second
pipe.Expire(ctx, infoKey, expiry)
// 初始化库存(从数据库同步)
stockKey := fmt.Sprintf(FlashSaleGoodsStockKey, goods.GoodsId)
pipe.Set(ctx, stockKey, goods.Stock, expiry)
// 加入活动商品列表
activityGoodsKey := fmt.Sprintf(FlashSaleActivityGoods, activityId)
pipe.SAdd(ctx, activityGoodsKey, goods.GoodsId)
pipe.Expire(ctx, activityGoodsKey, expiry)
}
_, err = pipe.Exec(ctx)
return err
}
采用“先删缓存再更新数据库”+“延迟双删”策略,解决秒杀场景下的缓存一致性问题,结合消息队列确保最终一致。
// 库存更新后的缓存一致性处理
func (s *StockServiceImpl) UpdateStockAfterSale(ctx context.Context, goodsId uint32, newStock int) error {
// 1. 先删除Redis缓存(避免脏读)
stockKey := fmt.Sprintf(FlashSaleGoodsStockKey, goodsId)
_, err := s.redisClient.Del(ctx, stockKey).Result()
if err != nil {
s.logger.Warn(ctx, "删除库存缓存失败", g.Map{"goods_id": goodsId, "err": err})
}
// 2. 更新数据库库存(开启事务)
tx := s.db.Begin()
if err := tx.Model(&model.FlashSaleStock{}).
Where("goods_id = ?", goodsId).
Update("available_stock", newStock).Error; err != nil {
tx.Rollback()
return err
}
tx.Commit()
// 3. 延迟1秒再次删除缓存(解决并发更新问题)
go func() {
time.Sleep(1 * time.Second)
_, _ = s.redisClient.Del(context.Background(), stockKey).Result()
}()
// 4. 发送库存更新消息,用于后续监控与补偿
_ = s.msgService.PublishStockUpdateMsg(ctx, &message.StockUpdateMsg{
GoodsId: goodsId,
NewStock: newStock,
UpdateTime: time.Now().Unix(),
})
return nil
}
秒杀系统数据库需精简字段,聚焦核心业务,同时通过分库分表应对高并发写入,主要包含活动表、商品表、订单表、库存表。
表名 | 核心字段 | 设计说明 |
|---|---|---|
flash_sale_activity | id(主键)、activity_name、start_time、end_time、status、create_time | 存储秒杀活动基本信息,按活动状态建立索引 |
flash_sale_goods | id(主键)、activity_id、goods_id、flash_price、max_buy、sort | 活动与商品关联表,联合索引(activity_id, status) |
flash_sale_stock | id(主键)、goods_id、total_stock、available_stock、locked_stock | 库存表,行级锁优化,避免并发更新冲突 |
flash_sale_order | id(主键)、order_no、user_id、goods_id、activity_id、amount、status | 秒杀订单表,分表字段user_id,联合索引(user_id, goods_id) |
秒杀订单表采用“用户ID哈希分表”,将订单数据分散至8个分表,降低单表并发压力,分表规则通过Sharding-JDBC实现。
// Sharding-JDBC分表配置(秒杀订单表)
spring:
shardingsphere:
rules:
sharding:
tables:
flash_sale_order:
actual-data-nodes: ds0.flash_sale_order_${0..7} # 8个分表
database-strategy:
none: # 单库分表
table-strategy:
standard:
sharding-column: user_id # 分表字段
sharding-algorithm-name: flash_sale_order_inline
sharding-algorithms:
flash_sale_order_inline:
type: INLINE
props:
algorithm-expression: flash_sale_order_${user_id % 8} # 哈希取模分表
props:
sql-show: false # 生产环境关闭SQL日志
结合Redis原子操作与数据库乐观锁,实现双重防超卖,确保库存数据准确。Redis层先进行预扣减,数据库层最终校验。
// 基于Redis Lua脚本的原子库存扣减(防超卖第一步)
const stockReduceLua = `
local stockKey = KEYS[1]
local soldKey = KEYS[2]
local reduceCount = tonumber(ARGV[1])
-- 1. 检查库存是否充足
local stock = tonumber(redis.call('GET', stockKey))
if not stock or stock < reduceCount then
return 0 -- 库存不足
end
-- 2. 原子扣减库存与累加已售量
redis.call('DECRBY', stockKey, reduceCount)
redis.call('INCRBY', soldKey, reduceCount)
return 1 -- 扣减成功
`
// ReduceStock 库存扣减(Redis预扣减+数据库最终确认)
func (s *StockServiceImpl) ReduceStock(ctx context.Context, goodsId uint32, userId uint32, count int) (bool, error) {
stockKey := fmt.Sprintf(FlashSaleGoodsStockKey, goodsId)
soldKey := fmt.Sprintf(FlashSaleGoodsSoldKey, goodsId)
// 1. Redis原子扣减
result, err := s.redisClient.Eval(ctx, stockReduceLua, []string{stockKey, soldKey}, count).Result()
if err != nil || result == 0 {
return false, err
}
// 2. 数据库乐观锁最终确认(防止Redis与DB数据不一致)
rowsAffected, err := s.db.Model(&model.FlashSaleStock{}).
Where("goods_id = ? AND available_stock >= ?", goodsId, count).
Update("available_stock", gorm.Expr("available_stock - ?", count)).
RowsAffected
if err != nil {
// 数据库扣减失败,回滚Redis库存
s.redisClient.IncrBy(ctx, stockKey, int64(count))
s.redisClient.DecrBy(ctx, soldKey, int64(count))
return false, err
}
return rowsAffected > 0, nil
}
从用户身份校验、行为频率限制、设备指纹三个维度实现防刷,避免恶意用户占用秒杀资源。
// AntiBrushService 防刷服务
type AntiBrushService struct {
redisClient *redis.Client
userService *service.UserService
}
// CheckUserValid 校验用户秒杀资格(防刷核心逻辑)
func (s *AntiBrushService) CheckUserValid(ctx context.Context, req *dto.CreateFlashSaleOrderReq) (bool, string) {
userId := req.UserId
goodsId := req.GoodsId
// 1. 校验用户状态(是否为黑名单用户)
if s.userService.IsBlackList(ctx, userId) {
return false, "您的账号存在异常,暂无法参与秒杀"
}
// 2. 限制单用户单商品请求频率(10秒内最多3次)
freqKey := fmt.Sprintf("flash_sale:freq:user:%d:goods:%d", userId, goodsId)
reqCount, _ := s.redisClient.Incr(ctx, freqKey).Result()
if reqCount == 1 {
s.redisClient.Expire(ctx, freqKey, 10*time.Second)
}
if reqCount > 3 {
return false, "请求过于频繁,请稍后再试"
}
// 3. 校验用户购买记录(是否已购买该商品)
buyKey := fmt.Sprintf(FlashSaleUserBuyKey, userId, goodsId)
if s.redisClient.SIsMember(ctx, buyKey, goodsId).Val() {
return false, "您已购买过该秒杀商品,请勿重复提交"
}
// 4. 设备指纹校验(非核心逻辑,可集成第三方SDK)
// deviceValid := s.checkDeviceFingerprint(ctx, req.DeviceId)
// if !deviceValid {
// return false, "设备异常,暂无法参与秒杀"
// }
return true, ""
}
基于Prometheus+Grafana构建监控体系,重点监控流量、库存、订单、服务健康四类指标,确保问题早发现。
// 基于Prometheus AlertManager的告警规则配置
groups:
- name: flash_sale_alerts
rules:
# 1. 网关QPS突增告警
- alert: GatewayQpsSurge
expr: sum(rate(gateway_requests_total[5m])) / sum(rate(gateway_requests_total[15m])) > 1.5
for: 1m
labels:
severity: critical
annotations:
summary: "秒杀网关QPS突增"
description: "网关QPS在5分钟内较15分钟前增长超过50%,当前QPS: {{ $value }}"
# 2. 库存差异告警
- alert: StockInconsistency
expr: abs(redis_stock_total - db_stock_total) > 10
for: 2m
labels:
severity: warning
annotations:
summary: "秒杀库存数据不一致"
description: "Redis与DB库存差异超过10,Redis库存: {{ $labels.redis_stock_total }}, DB库存: {{ $labels.db_stock_total }}"
# 3. 服务响应时间告警
- alert: ServiceResponseSlow
expr: histogram_quantile(0.99, sum(rate(service_request_duration_seconds_bucket[5m])) by (le, service)) > 0.5
for: 1m
labels:
severity: critical
annotations:
summary: "秒杀服务响应缓慢"
description: "{{ $labels.service }}服务P99响应时间超过500ms,当前值: {{ $value }}s"
采用“多活部署”架构,核心服务(秒杀服务、库存服务)部署至少3个节点,Redis采用主从+哨兵模式,RabbitMQ集群部署确保消息不丢失。
// Redis主从哨兵配置(简化)
sentinel monitor mymaster 192.168.1.100 6379 2 # 主节点地址,2个哨兵确认主节点故障
sentinel down-after-milliseconds mymaster 3000 # 3秒无响应标记为故障
sentinel failover-timeout mymaster 10000 # 故障转移超时时间10秒
sentinel parallel-syncs mymaster 1 # 故障转移后同步从节点数量
// RabbitMQ集群配置(镜像队列)
rabbitmqctl set_policy ha-all "^flash_sale_" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
# 对flash_sale_前缀的队列启用镜像队列,所有节点同步消息
本秒杀系统通过“限流-削峰-缓存-异步”核心架构,解决了高并发场景下的性能与数据一致性问题。后续优化可聚焦三个方向: