首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用异步抓取数据时内存利用率不断增加

使用异步抓取数据时内存利用率不断增加
EN

Stack Overflow用户
提问于 2022-06-01 20:36:21
回答 1查看 236关注 0票数 2

我正在使用异步抓取数据,并将数据存储在Redis数据库中。我的代码运行良好,但是linux服务器上的内存利用率一直在增加,直到达到100%,然后冻结服务器。我必须手动重新启动服务器并重新启动脚本。我使用两个凭据来访问api端点,以尽可能快地获取数据。

下面是示例代码:

代码语言:javascript
复制
from asyncio import tasks
from datetime import datetime, timedelta
from multiprocessing import Semaphore
from socket import timeout
import time
import asyncio
from aiohttp import ClientSession
from requests.exceptions import HTTPError
import config
import json
import pandas as pd
from loguru import logger
import pytz
import aioredis
from redis import Redis

RESULTS = []
result_dict = {}


redis = Redis(
    host="host",
    port=6379,
    decode_responses=True,
    # ssl=True,
    username="default",
    password="password",
)


async def get(url, session):
    try:
        response = await session.request(method="GET", url=url, timeout=1)
    except Exception as err:
        response = await session.request(method="GET", url=url, timeout=3)
    pokemon = await response.json()
    return pokemon["name"]


async def run_program(url, session, semaphore):
    async with semaphore:
        try:
            pokemon_name = await get(url, session)
            await publish(pokemon_name)
        except:
            pass


async def main():
    header_dict = {
        "header1": {
            # Request headers
            # "API-Key-1": config.PRIMARY_API_KEY,
            "Cache-Control": "no-cache",
        },
        "header2": {
            # "API-Key-2": config.SECONDARY_API_KEY,
            "Cache-Control": "no-cache",
        },
    }
    semaphore = asyncio.BoundedSemaphore(20)
    tasks = []
    for key, value in header_dict.items():
        # logger.info(value)
        async with ClientSession(headers=value) as session:
            for i in range(0, 5):
                URLS = f"https://pokeapi.co/api/v2/pokemon/{i}"
                tasks.append(
                    asyncio.ensure_future(run_program(URLS, session, semaphore))
                )
            await asyncio.gather(*tasks)


async def publish(data):
    if not data.empty:
        try:
            keyName = "channelName"
            value = data
            redis.set(keyName, value)
            print("inserting")
        except:
            pass
    else:
        pass


while True:
    try:
        asyncio.run(main(), debug=True)
    except Exception as e:
        time.sleep(1)
        asyncio.run(main(), debug=True)

我想知道为什么内存消耗在增加,以及如何阻止它。

以下是内存利用率随时间变化的百分比图像。除了这个Linux服务器之外,没有其他脚本运行在同一台Linux服务器上。

EN

回答 1

Stack Overflow用户

发布于 2022-06-02 14:56:35

记忆被舔的原因有很多。

您正在连接到Redis的connection.

  • When

  • ,并且永远不要关闭您设置的timeout=1 --它很可能会引发异常,这可能是占用内存的主要原因(参见:Python not catching MemoryError)

  • The会话是在头上的每次迭代中创建的。在本例中,这是两个,但不确定真正的标题列表size.

  • tasks在调用gather后不会变为空。

我试图优化代码,下面是我得到的。

代码语言:javascript
复制
import asyncio
import time

from aiohttp import ClientSession
from redis import DataError
from redis import Redis


async def publish(data, redis):
    if not data.empty:
        try:
            redis.set("channelName", data)
        except (DataError, Exception):
            pass


async def run_program(url, session, headers, semaphore, redis):
    async with semaphore:
        try:
            response = await session.request(method="GET", url=url, headers=headers)
            pokemon = await response.json()
            pokemon_name = pokemon.get("name")
            await publish(pokemon_name, redis)
        except:
            pass


async def main():
    header_dict = {
        "header1": {
            # Request headers
            "Cache-Control": "no-cache",
        },
        "header2": {
            "Cache-Control": "no-cache",
        },
    }
    semaphore = asyncio.BoundedSemaphore(20)
    async with ClientSession() as session:
        for headers in header_dict.values():
            with Redis(host="host", port=6379, decode_responses=True, username="default", password="password") as redis:
                await asyncio.gather(*[
                    asyncio.ensure_future(
                        run_program(f"https://pokeapi.co/api/v2/pokemon/{i}", session, headers, semaphore, redis)
                    ) for i in range(5)
                ])


while True:
    try:
        asyncio.run(main(), debug=True)
    except Exception as e:
        time.sleep(1)
        asyncio.run(main(), debug=True)

所有这些更改都应该优化内存的使用。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72467765

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档