首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Asyncio请求队列

Asyncio请求队列
EN

Code Review用户
提问于 2018-07-31 01:15:40
回答 1查看 825关注 0票数 6

我使用异步和aiohttp编写了一个简单的sitemap.xml检查器。我遵循了演示生产者/消费者模式的文档。但是,我注意到随着URL的扩展,它的性能似乎变慢了。我做错什么了吗?我能提高请求速度吗?

用例

check()被赋予具有~310个链接的https://www.google.com/flights/sitemap.xml时,大约需要00:03:24分钟才能完成。如果需要,Github 来源代码是可用的。

代码语言:javascript
复制
# -*- coding: utf-8 -*-

from timeit import default_timer as timer
from sys import exit as abort

import time
import sys
import logging
import asyncio
import aiohttp
import defusedxml.ElementTree

class Logger(object):

    FMT = '%(name)s: %(levelname)s: %(message)s'

    def __init__(self):
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(level=logging.INFO)

        stdout = logging.StreamHandler(stream=sys.stdout)
        stderr = logging.StreamHandler(stream=sys.stderr)

        stdout.setLevel(level=logging.INFO)
        stderr.setLevel(level=logging.WARNING)

        stdout.addFilter(lambda record: record.levelno == logging.INFO)

        stdout.setFormatter(
            logging.Formatter(
                fmt=self.FMT,
                datefmt=None,
                style='%'))
        stderr.setFormatter(
            logging.Formatter(
                fmt=self.FMT,
                datefmt=None,
                style='%'))

        self._logger.addHandler(hdlr=stdout)
        self._logger.addHandler(hdlr=stderr)

    def __del__(self):
        if not self._logger.hasHandlers():
            return
        for handler in self._logger.handlers:
            if isinstance(handler, logging.StreamHandler):
                handler.flush()
                handler.close()
            self._logger.removeHandler(handler)


class Config(object):

    """Base Config."""

    LIMIT = 100
    TIMEOUT = None
    USER_AGENT = 'Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)'
    MAXSIZE = 0


class ProdConfig(Config):

    """Prod Config."""

    TIMEOUT = 8
    USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36'
    MAXSIZE = 500


class Checker(object):

    """Sitemap Checker."""

    def __init__(self):
        self._logger = Logger()
        self._loop = asyncio.get_event_loop()
        self._queue = asyncio.Queue(
            maxsize=ProdConfig.MAXSIZE, loop=self._loop)

    def check(self, url):
        """Main() entry-point."""
        start = timer()
        self._loop.run_until_complete(self._fetch_links(url))
        elapsed = time.strftime(
            '%H:%M:%S', time.gmtime(timer() - start))
        self._logger._logger.info('time elapsed {}'.format(elapsed))

    async def _fetch_doc(self, client, url):
        """Fetch a sitemap.xml document."""
        self._logger._logger.info('fetching sitemap @ {}'.format(url))
        try:
            async with client.get(
                url=url,
                allow_redirects=True,
                timeout=ProdConfig.TIMEOUT, 
                verify_ssl=True
                    if url.startswith('https') else False) as response:
                response.raise_for_status()
                return await response.text()
        except aiohttp.ClientResponseError as error:
            self._logger._logger.error(
                'sitemap yielded <{}>'.format(
                    error.status))
        except aiohttp.ClientError as error:
            self._logger._logger.error(str(error))
        abort(1)

    async def _producer(self, doc):
        """Parse sitemap.xml and queue discovered links."""
        try:
            root = defusedxml.ElementTree.fromstring(doc)
        except defusedxml.ElementTree.ParseError:
            self._logger._logger.error('failed to parse *.xml document')
            abort(1)
        self._logger._logger.info(
            '*.xml document contains ({}) links'.format(
                len(root)))
        for link in root:
            if link:
                await self._queue.put(''.join(link[0].text.split()))

    async def _consumer(self, client):
        """Process queued links with HEAD requests."""
        while True:
            url = await self._queue.get()
            async with client.head(
                    url=url,
                    allow_redirects=True,
                    timeout=ProdConfig.TIMEOUT, 
                    verify_ssl=True if url.startswith('https') else False) as http:
                self._logger._logger.info(
                    '<{}> {} - {}'.format(http.status, http.reason, url))
                self._queue.task_done()

    async def _fetch_links(self, url):
        """Fetch sitemap.xml links."""
        headers = {'User-Agent': ProdConfig.USER_AGENT}
        connector = aiohttp.TCPConnector(
            limit=ProdConfig.LIMIT, loop=self._loop)
        async with aiohttp.ClientSession(
                connector=connector, loop=self._loop, headers=headers) as client:
            doc = await self._fetch_doc(client, url)
            consumer = asyncio.ensure_future(self._consumer(client))
            await self._producer(doc)
            await self._queue.join()
            consumer.cancel()

    def __del__(self):
        if self._loop:
            if not self._loop.is_running:
                self._loop.close()

if __name__ == '__main__':
    Checker().check(sys.argv[1])
EN

回答 1

Code Review用户

发布于 2023-01-06 22:32:25

这个问题是关于过去的时间。请更新它,包括cProfile观察或类似的挂钟图。

报告的吞吐量表明,处理每个URL需要660毫秒的挂钟时间。注意其中有多少毫秒是本地CPU繁忙时间,这将是有帮助的。了解第95百分位数的响应时间离中位数有多远是很有趣的。

目前尚不清楚谷歌终端服务器或e2e互联网管道是否能够支持更高的请求率,问题不包括ab或类似的基准数据。

风格nit:

代码语言:javascript
复制
class Logger(object):

更喜欢class Logger:。是的,我们知道它是从object继承的。在python2中,这曾经对MRO产生了影响,但是自从日落之后,这就不是很重要了。

源代码看起来很好,没有明显的缺陷。

把它运出去。

票数 1
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/200631

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档