首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >aiohttp:速率限制并行请求

aiohttp:速率限制并行请求
EN

Stack Overflow用户
提问于 2018-02-08 09:39:49
回答 5查看 16.1K关注 0票数 33

API通常有用户必须遵守的速率限制。例如,让我们以50个请求/秒为例。顺序请求花费0.5秒-1秒,因此速度太慢,无法接近这个限制。然而,使用aiohttp的并行请求超过了速率限制。

为了尽可能快地轮询API,需要限制并行调用。

到目前为止,我发现了一些装饰session.get的例子,大致如下:

代码语言:javascript
复制
session.get = rate_limited(max_calls_per_second)(session.get)

这对于连续调用很好。试图在并行调用中实现这一点并不像预期的那样工作。

下面是一些代码作为示例:

代码语言:javascript
复制
async with aiohttp.ClientSession() as session:
    session.get = rate_limited(max_calls_per_second)(session.get)
    tasks = (asyncio.ensure_future(download_coroutine(  
          timeout, session, url)) for url in urls)
    process_responses_function(await asyncio.gather(*tasks))

问题在于它将限制任务的排队。使用gather的执行仍然或多或少地同时进行。这两个世界中最糟糕的;-)

是的,我在这里发现了一个类似的问题,设置每秒最大请求数,但是没有回答限制请求速率的实际问题。另外,Quentin Pradet的博客文章只在限制排队速率的情况下工作。

结束它:如何限制并行aiohttp请求每秒的请求数?

EN

回答 5

Stack Overflow用户

回答已采纳

发布于 2018-02-08 09:54:12

如果我很了解你,你想限制同步请求的数量吗?

asyncio中有一个名为Semaphore的对象,它的工作方式类似于异步RLock

代码语言:javascript
复制
semaphore = asyncio.Semaphore(50)
#...
async def limit_wrap(url):
    async with semaphore:
        # do what you want
#...
results = asyncio.gather([limit_wrap(url) for url in urls])

已更新

假设我发出50个并发请求,它们都在2秒内完成。因此,它没有触及限制(每秒钟只有25次请求)。

这意味着我应该发出100个并发请求,它们也都在2秒内完成(每秒钟50个请求)。但在你真正提出这些请求之前,你如何确定它们会完成多长时间呢?

或者,如果您不介意每秒完成请求,那么,但是请求每秒发出。您可以:

代码语言:javascript
复制
async def loop_wrap(urls):
    for url in urls:
        asyncio.ensure_future(download(url))
        await asyncio.sleep(1/50)

asyncio.ensure_future(loop_wrap(urls))
loop.run_forever()

上面的代码将每秒钟创建一个Future实例。

票数 21
EN

Stack Overflow用户

发布于 2020-02-22 23:21:07

我通过创建一个aiohttp.ClientSession()的子类来解决这个问题,该子类基于漏桶算法创建了一个消除棘轮的子类。我使用asyncio.Queue()来消除等级,而不是Semaphores。我只重写了_request()方法。我发现这种方法更干净,因为您只将session = aiohttp.ClientSession()替换为session = ThrottledClientSession(rate_limit=15)

代码语言:javascript
复制
class ThrottledClientSession(aiohttp.ClientSession):
    """
    Rate-throttled client session class inherited from aiohttp.ClientSession)

    USAGE:
        replace `session = aiohttp.ClientSession()`
        with `session = ThrottledClientSession(rate_limit=15)`

    see https://stackoverflow.com/a/60357775/107049
    """

    MIN_SLEEP = 0.1

    def __init__(self, rate_limit: float = None, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.rate_limit = rate_limit
        self._fillerTask = None
        self._queue = None
        self._start_time = time.time()
        if rate_limit is not None:
            if rate_limit <= 0:
                raise ValueError('rate_limit must be positive')
            self._queue = asyncio.Queue(min(2, int(rate_limit) + 1))
            self._fillerTask = asyncio.create_task(self._filler(rate_limit))

    def _get_sleep(self) -> Optional[float]:
        if self.rate_limit is not None:
            return max(1 / self.rate_limit, self.MIN_SLEEP)
        return None

    async def close(self) -> None:
        """Close rate-limiter's "bucket filler" task"""
        if self._fillerTask is not None:
            self._fillerTask.cancel()
        try:
            await asyncio.wait_for(self._fillerTask, timeout=0.5)
        except asyncio.TimeoutError as err:
            print(str(err))
        await super().close()

    async def _filler(self, rate_limit: float = 1):
        """Filler task to fill the leaky bucket algo"""
        try:
            if self._queue is None:
                return
            self.rate_limit = rate_limit
            sleep = self._get_sleep()
            updated_at = time.monotonic()
            fraction = 0
            extra_increment = 0
            for i in range(0, self._queue.maxsize):
                self._queue.put_nowait(i)
            while True:
                if not self._queue.full():
                    now = time.monotonic()
                    increment = rate_limit * (now - updated_at)
                    fraction += increment % 1
                    extra_increment = fraction // 1
                    items_2_add = int(min(self._queue.maxsize - self._queue.qsize(), int(increment) + extra_increment))
                    fraction = fraction % 1
                    for i in range(0, items_2_add):
                        self._queue.put_nowait(i)
                    updated_at = now
                await asyncio.sleep(sleep)
        except asyncio.CancelledError:
            print('Cancelled')
        except Exception as err:
            print(str(err))

    async def _allow(self) -> None:
        if self._queue is not None:
            # debug
            # if self._start_time == None:
            #    self._start_time = time.time()
            await self._queue.get()
            self._queue.task_done()
        return None

    async def _request(self, *args, **kwargs)  -> aiohttp.ClientResponse:
        """Throttled _request()"""
        await self._allow()
        return await super()._request(*args, **kwargs)
票数 16
EN

Stack Overflow用户

发布于 2020-08-30 05:14:33

我喜欢@sraw以异步的方式来处理这个问题,但是他们的回答并没有完全打断我的意思。因为我不知道我的下载调用是否会比速率限制更快或更慢,所以我想选择在请求慢的时候并行运行多个,在请求非常快的时候运行一个,所以我总是在速率限制上正确。

我这样做的方法是与一个生产者一起使用一个队列,该队列在速率限制的情况下生成新任务,然后许多消费者要么等待下一个任务,如果他们速度快,要么在队列中备份工作,如果他们速度慢,并且在处理器/网络允许的范围内运行。

代码语言:javascript
复制
import asyncio
from datetime import datetime 

async def download(url):
  # download or whatever
  task_time = 1/10
  await asyncio.sleep(task_time)
  result = datetime.now()
  return result, url

async def producer_fn(queue, urls, max_per_second):
  for url in urls:
    await queue.put(url)
    await asyncio.sleep(1/max_per_second)
 
async def consumer(work_queue, result_queue):
  while True:
    url = await work_queue.get()
    result = await download(url)
    work_queue.task_done()
    await result_queue.put(result)

urls = range(20)
async def main():
  work_queue = asyncio.Queue()
  result_queue = asyncio.Queue()

  num_consumer_tasks = 10
  max_per_second = 5
  consumers = [asyncio.create_task(consumer(work_queue, result_queue))
               for _ in range(num_consumer_tasks)]    
  producer = asyncio.create_task(producer_fn(work_queue, urls, max_per_second))
  await producer

  # wait for the remaining tasks to be processed
  await work_queue.join()
  # cancel the consumers, which are now idle
  for c in consumers:
    c.cancel()

  while not result_queue.empty():
    result, url = await result_queue.get()
    print(f'{url} finished at {result}')
 
asyncio.run(main())
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48682147

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档