首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用于速率限制异步调用的令牌桶上下文管理器

用于速率限制异步调用的令牌桶上下文管理器
EN

Code Review用户
提问于 2019-10-17 15:03:05
回答 1查看 168关注 0票数 3

我已经编写了一个类,它实现了类似于令牌桶算法的东西,这样就可以限制从应用程序发出的aysnc请求。

代码可以工作,但我仍然不确定它是否“正确”,可能只是因为我刚开始使用Python进行异步编程。

具体关切:

  • 状态(self.lastself.tokens)如何被管理(或不被管理)感觉很奇怪/很糟糕,这意味着在_refill方法中进行检查。
  • 也许我不应该为此使用上下文管理器(__aenter____aexit__)。
  • 我不确定我是否应该在__aexit__做任何清理工作
  • 算法的实现可能不会100%忠实,因为我等待桶是空的,然后再作为一批灌装。我认为这是可以的,因为每个请求只能使用一个令牌。
代码语言:javascript
复制
"""
Typical usage example:
    async def rateLimitedTask(bucket):
        await bucket.removeToken() #wait for token
        #do the task which has now been rate limited
        ...

    async with TokenBucket(2, 1/5) as bucket:
        #make arbitrary async calls to rateLimitedTask()
        ...
"""

import asyncio
import time
from collections import deque

class TokenBucket():
    """Context manager which provides a token bucket.

    Attributes:
        tokens (collections.deque): The tokens (which manifest as timestamps)
                                    in reverse chronological order.
        last (float):               The most recent token consumed.
        rate (float):               See __init__ arg `tokensPerS`.
        capacity (int):             See __init__ arg `capacity`.
        sleepDuration (float):      See __init__ arg `refillSleepS`.

    """

    def __init__(self, capacity=1, tokensPerS=1.0, refillSleepS=0.1):
        """Initialises the TokenBucket context manager.

        Args:
            capacity (int, optional):       The maxiumum tokens the bucket can
                                            hold. Larger is burstier.
                                            Defaults to 1.
            tokensPerS (float, optional):   The average replenishment rate in
                                            tokens per second.
                                            Defaults to 1.
            refillSleepS (float, optional): If not enough time has passed
                                            to refill; how many seconds to
                                            sleep before trying again.
                                            Defaults to 0.1.
        """
        self.capacity = capacity
        self.rate = tokensPerS
        self.last = None
        self.tokens = deque([])
        self.sleepDuration = refillSleepS

    def hasTokens(self):
        return len(self.tokens) > 0

    def isEmpty(self):
        return len(self.tokens) < 1

    async def _refill(self):
        """Refill the empty bucket back up to capacity.

        Is called only when the bucket is empty.
        The tokens themselves are timestamps of when the token was created.
        Uses monotonic time to safeguard against system clock changes.
        """
        if self.last: #prevents unnecessary run on virgin execution
            while self.capacity / (time.monotonic() - self.last) > self.rate:
                await asyncio.sleep(self.sleepDuration)
                if self.hasTokens():
                    return #another call filled the bucket already

        for _ in range(self.capacity):
            self.tokens.appendleft(time.monotonic())

    async def removeToken(self):
        """Removes a token from the bucket and returns void.

        If the bucket is empty it will await its refill before returning.
        """
        if self.isEmpty():
            await self._refill()

        self.last = self.tokens.pop()

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc, tb):
        pass

任何帮助都将不胜感激。

EN

回答 1

Code Review用户

发布于 2019-10-23 01:57:48

父级无类

代码语言:javascript
复制
class TokenBucket():

可以把孩子们扔了。

lower_snake_case

这些变量如下:

  • tokensPerS
  • refillSleepS
  • sleepDuration

等等,以及您的所有方法,按照惯例,应该使用lower_snake_case而不是camelCase。

类型提示

PEP484允许你转动这个

代码语言:javascript
复制
def hasTokens(self):

投入到这个

代码语言:javascript
复制
def hasTokens(self) -> bool:

同样,对于其他方法参数和返回也是如此。除其他外,它将允许您从如下评论中删除您的类型:

代码语言:javascript
复制
"""
    tokens (collections.deque): The tokens (which manifest as timestamps)
                                in reverse chronological order.
    last (float):               The most recent token consumed.
    rate (float):               See __init__ arg `tokensPerS`.
    capacity (int):             See __init__ arg `capacity`.
    sleepDuration (float):      See __init__ arg `refillSleepS`.
"""

因为他们已经在签名里了。

票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/230911

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档