首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >redis实现一个排行榜功能

redis实现一个排行榜功能

原创
作者头像
用户12278826
发布2026-03-19 05:19:22
发布2026-03-19 05:19:22
30
举报

import redis import json from typing import List, Optional, Tuple from datetime import datetime

class LeaderboardService: """基于 Redis Sorted Set 的排行榜服务"""

代码语言:javascript
复制
python 体验AI代码助手 代码解读复制代码def __init__(self, redis_client: redis.Redis):
    self.r = redis_client
    self.key_prefix = "leaderboard"

def _get_key(self, board_id: str) -> str:
    """生成排行榜 key"""
    return f"{self.key_prefix}:{board_id}"

def add_score(self, board_id: str, user_id: str, score: float, 
              extra_data: dict = None) -> float:
    """
    添加/更新用户分数
    
    Args:
        board_id: 排行榜ID(如 "weekly", "global")
        user_id: 用户ID
        score: 分数(支持小数)
        extra_data: 额外数据(JSON序列化存储)
    """
    key = self._get_key(board_id)
    
    # 使用管道保证原子性
    pipe = self.r.pipeline()
    
    # 添加分数
    pipe.zadd(key, {user_id: score})
    
    # 如果有额外数据,存储到 Hash
    if extra_data:
        hash_key = f"{key}:info:{user_id}"
        pipe.hset(hash_key, mapping={
            "data": json.dumps(extra_data),
            "updated_at": datetime.now().isoformat()
        })
        # 设置过期时间(与排行榜一致)
        pipe.expire(hash_key, 86400 * 7)  # 7天
    
    pipe.execute()
    return score

def increment_score(self, board_id: str, user_id: str, 
                    delta: float) -> float:
    """
    原子增加分数(常用于实时计分)
    
    Returns:
        增加后的新分数
    """
    key = self._get_key(board_id)
    new_score = self.r.zincrby(key, delta, user_id)
    return float(new_score)

def get_top_n(self, board_id: str, n: int = 10, 
              with_scores: bool = True) -> List[dict]:
    """
    获取前 N 名
    
    Returns:
        [{"rank": 1, "user_id": "xxx", "score": 1000, "data": {...}}, ...]
    """
    key = self._get_key(board_id)
    
    # ZREVRANGE: 从高到低(默认)
    results = self.r.zrevrange(
        key, 0, n - 1, 
        withscores=with_scores
    )
    
    top_list = []
    for i, item in enumerate(results, 1):
        if with_scores:
            user_id, score = item
            user_id = user_id.decode() if isinstance(user_id, bytes) else user_id
            score = float(score)
        else:
            user_id = item.decode() if isinstance(item, bytes) else item
            score = None
        
        # 获取额外数据
        extra_data = self._get_user_extra_data(key, user_id)
        
        top_list.append({
            "rank": i,
            "user_id": user_id,
            "score": score,
            "data": extra_data
        })
    
    return top_list

def get_user_rank(self, board_id: str, user_id: str) -> Optional[dict]:
    """
    获取用户排名和分数
    
    Returns:
        None 表示不在排行榜中
    """
    key = self._get_key(board_id)
    
    # 排名(0-based,需要+1)
    rank = self.r.zrevrank(key, user_id)
    if rank is None:
        return None
    
    score = self.r.zscore(key, user_id)
    extra_data = self._get_user_extra_data(key, user_id)
    
    return {
        "rank": rank + 1,  # 转为 1-based
        "user_id": user_id,
        "score": float(score),
        "data": extra_data
    }

def get_nearby_users(self, board_id: str, user_id: str, 
                     range_size: int = 5) -> List[dict]:
    """
    获取用户附近的排名(如前5名、后5名)
    
    常用于游戏:显示"你前面还有谁"
    """
    key = self._get_key(board_id)
    
    # 获取用户排名
    user_rank = self.r.zrevrank(key, user_id)
    if user_rank is None:
        return []
    
    # 计算范围
    start = max(0, user_rank - range_size)
    end = user_rank + range_size
    
    # 获取范围内所有人
    results = self.r.zrevrange(key, start, end, withscores=True)
    
    nearby = []
    for i, (uid, score) in enumerate(results, start=start + 1):
        uid = uid.decode() if isinstance(uid, bytes) else uid
        nearby.append({
            "rank": i,
            "user_id": uid,
            "score": float(score),
            "is_current_user": uid == user_id
        })
    
    return nearby

def _get_user_extra_data(self, key: str, user_id: str) -> Optional[dict]:
    """获取用户额外数据"""
    hash_key = f"{key}:info:{user_id}"
    data = self.r.hget(hash_key, "data")
    if data:
        return json.loads(data)
    return None

def remove_user(self, board_id: str, user_id: str) -> bool:
    """从排行榜移除用户"""
    key = self._get_key(board_id)
    pipe = self.r.pipeline()
    pipe.zrem(key, user_id)
    pipe.delete(f"{key}:info:{user_id}")
    pipe.execute()
    return True

def clear_board(self, board_id: str) -> bool:
    """清空排行榜"""
    key = self._get_key(board_id)
    # 使用 SCAN 避免阻塞
    for info_key in self.r.scan_iter(match=f"{key}:info:*"):
        self.r.delete(info_key)
    self.r.delete(key)
    return True

def get_board_stats(self, board_id: str) -> dict:
    """获取排行榜统计信息"""
    key = self._get_key(board_id)
    
    total = self.r.zcard(key)
    if total == 0:
        return {"total": 0}
    
    # 获取分数范围
    min_score = self.r.zrange(key, 0, 0, withscores=True)[0][1]
    max_score = self.r.zrevrange(key, 0, 0, withscores=True)[0][1]
    
    return {
        "total": total,
        "min_score": float(min_score),
        "max_score": float(max_score),
        "avg_score": float(self.r.zscore(key, "__avg__") or 0)  # 需要额外维护
    }

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档