首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >python asyncio & httpx

python asyncio & httpx
EN

Stack Overflow用户
提问于 2021-05-27 06:05:16
回答 3查看 540关注 0票数 1

我对异步编程非常陌生,我也曾尝试过httpx。我有以下代码,我确信我做错了什么--只是不知道是什么。有两个方法,一个是同步的,另一个是异步的。它们都是从谷歌金融中拉出来的。在我的系统上,我看到花费的时间如下:

异步: 5.015218734741211

同步: 5.173618316650391

代码如下:

代码语言:javascript
复制
import httpx
import asyncio
import time



#
#--------------------------------------------------------------------
#
#--------------------------------------------------------------------
#
def sync_pull(url):
  r = httpx.get(url)
  print(r.status_code)


#
#--------------------------------------------------------------------
#
#--------------------------------------------------------------------
#
async def async_pull(url):
  async with httpx.AsyncClient() as client:
    r = await client.get(url)
    print(r.status_code)


#
#--------------------------------------------------------------------
#
#--------------------------------------------------------------------
#
if __name__ == "__main__":

  goog_fin_nyse_url = 'https://www.google.com/finance/quote/'
  tickers = ['F', 'TWTR', 'CVX', 'VZ', 'GME', 'GM', 'PG', 'AAL', 
             'MARK', 'AAP', 'THO', 'NGD', 'ZSAN', 'SEAC',
             ]  

  print("Running asynchronously...")
  async_start = time.time()
  for ticker in tickers:
    url = goog_fin_nyse_url + ticker + ':NYSE'
    asyncio.run(async_pull(url))
  async_end = time.time()
  print(f"Time lapsed is: {async_end - async_start}")


  print("Running synchronously...")
  sync_start = time.time()
  for ticker in tickers:
    url = goog_fin_nyse_url + ticker + ':NYSE'
    sync_pull(url)
  sync_end = time.time()
  print(f"Time lapsed is: {sync_end - sync_start}")

我曾希望异步方法方法所需的时间只是同步方法所需时间的一小部分。我做错了什么?

EN

回答 3

Stack Overflow用户

发布于 2021-05-27 06:41:40

当您说asyncio.run(async_pull)时,您说的是运行'async_pull‘并等待结果返回。由于您在循环中对每个计时器执行一次此操作,因此您实际上是在使用asyncio来同步运行,并且不会看到性能优势。

您需要做的是创建几个异步调用,并同时运行它们。有几种方法可以做到这一点,最简单的是使用asyncio.gather (请参阅https://docs.python.org/3/library/asyncio-task.html#asyncio.gather),它接受一系列协程并同时运行它们。调整您的代码相当简单,您可以创建一个异步函数来获取urls列表,然后对每个urls调用async_pull,然后将其传递给asyncio.gather并等待结果。使您的代码适应这一点,如下所示:

代码语言:javascript
复制
import httpx
import asyncio
import time

def sync_pull(url):
    r = httpx.get(url)
    print(r.status_code)

async def async_pull(url):
    async with httpx.AsyncClient() as client:
        r = await client.get(url)
        print(r.status_code)


async def async_pull_all(urls):
    return await asyncio.gather(*[async_pull(url) for url in urls])

if __name__ == "__main__":

    goog_fin_nyse_url = 'https://www.google.com/finance/quote/'
    tickers = ['F', 'TWTR', 'CVX', 'VZ', 'GME', 'GM', 'PG', 'AAL',
           'MARK', 'AAP', 'THO', 'NGD', 'ZSAN', 'SEAC',
           ]

    print("Running asynchronously...")
    async_start = time.time()
    results = asyncio.run(async_pull_all([goog_fin_nyse_url + ticker + ':NYSE' for ticker in tickers]))
    async_end = time.time()
    print(f"Time lapsed is: {async_end - async_start}")


    print("Running synchronously...")
    sync_start = time.time()
    for ticker in tickers:
        url = goog_fin_nyse_url + ticker + ':NYSE'
        sync_pull(url)
    sync_end = time.time()
    print(f"Time lapsed is: {sync_end - sync_start}")

以这种方式运行,对我来说,异步版本运行大约一秒钟,而不是同步运行七秒钟。

票数 2
EN

Stack Overflow用户

发布于 2021-08-09 16:10:41

下面是我使用的一个很好的模式(每次我都会稍微修改一下)。通常,我创建一个模块async_utils.py,然后只导入顶级的获取函数(例如这里的fetch_things),然后我的代码就可以自由地忽略内部(除了错误处理)。你可以用其他的方式来做,但是我喜欢‘函数式’风格的aiostream,并且经常发现对流程函数的重复调用采用了我使用functools.partial设置的某些默认值。

您可以将tqdm.tqdm进度条传递给pbar (使用已知大小的total=len(things)进行初始化),以便在处理每个异步响应时进行更新。

代码语言:javascript
复制
import asyncio
import httpx
from aiostream import stream
from functools import partial

__all__ = ["fetch", "process", "async_fetch_urlset", "fetch_things"]

async def fetch(session, url, raise_for_status=False):
    response = await session.get(str(url))
    if raise_for_status:
        response.raise_for_status()
    return response


async def process_thing(data, things, pbar=None, verbose=False):
    # Map the response back to the thing it came from in the things list
    source_url = data.history[0].url if data.history else data.url
    thing = next(t for t in things if source_url == t.get("thing_url"))
    # Handle `data.content` here, where `data` is the `httpx.Response`
    if verbose:
        print(f"Processing {source_url=}")
    build.update({"computed_value": "result goes here"})
    if pbar:
        pbar.update()


async def async_fetch_urlset(urls, things, pbar=None, verbose=False, timeout_s=10.0):
    timeout = httpx.Timeout(timeout=timeout_s)
    async with httpx.AsyncClient(timeout=timeout) as session:
        ws = stream.repeat(session)
        xs = stream.zip(ws, stream.iterate(urls))
        ys = stream.starmap(xs, fetch, ordered=False, task_limit=20)
        process = partial(process_thing, things=things, pbar=pbar, verbose=verbose)
        zs = stream.map(ys, process)
        return await zs

def fetch_things(urls, things, pbar=None, verbose=False):
    return asyncio.run(async_fetch_urlset(urls, things, pbar, verbose))

在本例中,输入是一个dict列表(带有字符串键和值)、URL,并访问键"thing_url"以检索things: list[dict[str,str]]。当你想要将结果‘映射’回它所来自的对象时,最好有一个字典或对象,而不仅仅是URL字符串。process_thing函数能够就地修改输入列表things (即,任何更改都不在函数的作用域内,而是在调用它的作用域中更改)。

您经常会发现在异步运行期间出现的错误,而这些错误在同步运行时是得不到的,因此您需要捕获这些错误,然后重试。一个常见的陷阱是在错误的级别重试(例如在整个循环中重试)

具体地说,您需要导入并捕获httpcore.ConnectTimeouthttpx.ConnectTimeouthttpx.RemoteProtocolErrorhttpx.ReadTimeout

增加timeout_s参数将通过让AsyncClient‘等待’更长时间来减少超时错误的频率,但这样做实际上可能会减慢您的程序(它不会那么快地‘失败’)。

下面是如何使用上面给出的async_utils模块的示例:

代码语言:javascript
复制
from async_utils import fetch_things
import httpx
import httpcore

# UNCOMMENT THIS TO SEE ALL THE HTTPX INTERNAL LOGGING
#import logging
#log = logging.getLogger()
#log.setLevel(logging.DEBUG)
#log_format = logging.Formatter('[%(asctime)s] [%(levelname)s] - %(message)s')
#console = logging.StreamHandler()
#console.setLevel(logging.DEBUG)
#console.setFormatter(log_format)
#log.addHandler(console)

things = [
    {"url": "https://python.org", "name": "Python"},
    {"url": "https://www.python-httpx.org/", "name": "HTTPX"},
]
#log.debug("URLSET:" + str(list(t.get("url") for t in things)))

def make_urlset(things):
    """Make a URL generator (empty if all have been fetched)"""
    urlset = (t.get("url") for t in things if "computed_value" not in t)
    return urlset

retryable_errors = (
    httpcore.ConnectTimeout,
    httpx.ConnectTimeout, httpx.RemoteProtocolError, httpx.ReadTimeout,
)

# ASYNCHRONOUS:
max_retries = 100
for i in range(max_retries):
    print(f"Retry {i}")
    try:
        urlset = make_urlset(things)
        foo = fetch_things(urls=urlset, things=things, verbose=True)
    except retryable_errors as exc:
        print(f"Caught {exc!r}")
        if i == max_retries - 1:
            raise
    except Exception:
        raise

# SYNCHRONOUS:
#for t in things:
#    resp = httpx.get(t["url"])

在本例中,一旦异步响应被成功处理,我就在字典上设置了一个密钥"computed_value",这样就可以防止在下一轮(再次调用make_urlset时)将该URL输入生成器。这样,生成器就会变得越来越小。你也可以用列表来做这件事,但是我发现一个URL生成器可以可靠地工作。对于对象,您需要将字典键分配/访问(update/in)更改为属性分配/访问(settatr/hasattr)。

票数 1
EN

Stack Overflow用户

发布于 2021-05-28 04:39:19

我想使用futures发布代码的工作版本--实际上是相同的运行时:

代码语言:javascript
复制
import httpx
import asyncio
import time

#
#--------------------------------------------------------------------
# Synchronous pull
#--------------------------------------------------------------------
#
def sync_pull(url):
  r = httpx.get(url)
  print(r.status_code)

#
#--------------------------------------------------------------------
# Asynchronous Pull
#--------------------------------------------------------------------
#
async def async_pull(url):
  async with httpx.AsyncClient() as client:
    r = await client.get(url)
    print(r.status_code)

#
#--------------------------------------------------------------------
# Build tasks queue & execute coroutines
#--------------------------------------------------------------------
#
async def build_task() -> None:
  goog_fin_nyse_url = 'https://www.google.com/finance/quote/'
  tickers = ['F', 'TWTR', 'CVX', 'VZ', 'GME', 'GM', 'PG', 'AAL', 
             'MARK', 'AAP', 'THO', 'NGD', 'ZSAN', 'SEAC',
             ]  
  tasks= []

  #
  ## Following block of code will create a queue full of function 
  ## call
  for ticker in tickers:
    url = goog_fin_nyse_url + ticker + ':NYSE'
    tasks.append(asyncio.ensure_future(async_pull(url)))

  start_time = time.time()

  #
  ## This block of code will derefernce the function calls
  ## from the queue, which will cause them all to run
  ## rapidly
  await asyncio.gather(*tasks)

  #
  ## Calculate time lapsed
  finish_time = time.time()
  elapsed_time = finish_time - start_time
  print(f"\n Time spent processing: {elapsed_time} ")


# Start from here
if __name__ == "__main__":
  asyncio.run(build_task())
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67713274

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档