我对异步编程非常陌生,我也曾尝试过httpx。我有以下代码,我确信我做错了什么--只是不知道是什么。有两个方法,一个是同步的,另一个是异步的。它们都是从谷歌金融中拉出来的。在我的系统上,我看到花费的时间如下:
异步: 5.015218734741211
同步: 5.173618316650391
代码如下:
import httpx
import asyncio
import time
#
#--------------------------------------------------------------------
#
#--------------------------------------------------------------------
#
def sync_pull(url):
r = httpx.get(url)
print(r.status_code)
#
#--------------------------------------------------------------------
#
#--------------------------------------------------------------------
#
async def async_pull(url):
async with httpx.AsyncClient() as client:
r = await client.get(url)
print(r.status_code)
#
#--------------------------------------------------------------------
#
#--------------------------------------------------------------------
#
if __name__ == "__main__":
goog_fin_nyse_url = 'https://www.google.com/finance/quote/'
tickers = ['F', 'TWTR', 'CVX', 'VZ', 'GME', 'GM', 'PG', 'AAL',
'MARK', 'AAP', 'THO', 'NGD', 'ZSAN', 'SEAC',
]
print("Running asynchronously...")
async_start = time.time()
for ticker in tickers:
url = goog_fin_nyse_url + ticker + ':NYSE'
asyncio.run(async_pull(url))
async_end = time.time()
print(f"Time lapsed is: {async_end - async_start}")
print("Running synchronously...")
sync_start = time.time()
for ticker in tickers:
url = goog_fin_nyse_url + ticker + ':NYSE'
sync_pull(url)
sync_end = time.time()
print(f"Time lapsed is: {sync_end - sync_start}")我曾希望异步方法方法所需的时间只是同步方法所需时间的一小部分。我做错了什么?
发布于 2021-05-27 06:41:40
当您说asyncio.run(async_pull)时,您说的是运行'async_pull‘并等待结果返回。由于您在循环中对每个计时器执行一次此操作,因此您实际上是在使用asyncio来同步运行,并且不会看到性能优势。
您需要做的是创建几个异步调用,并同时运行它们。有几种方法可以做到这一点,最简单的是使用asyncio.gather (请参阅https://docs.python.org/3/library/asyncio-task.html#asyncio.gather),它接受一系列协程并同时运行它们。调整您的代码相当简单,您可以创建一个异步函数来获取urls列表,然后对每个urls调用async_pull,然后将其传递给asyncio.gather并等待结果。使您的代码适应这一点,如下所示:
import httpx
import asyncio
import time
def sync_pull(url):
r = httpx.get(url)
print(r.status_code)
async def async_pull(url):
async with httpx.AsyncClient() as client:
r = await client.get(url)
print(r.status_code)
async def async_pull_all(urls):
return await asyncio.gather(*[async_pull(url) for url in urls])
if __name__ == "__main__":
goog_fin_nyse_url = 'https://www.google.com/finance/quote/'
tickers = ['F', 'TWTR', 'CVX', 'VZ', 'GME', 'GM', 'PG', 'AAL',
'MARK', 'AAP', 'THO', 'NGD', 'ZSAN', 'SEAC',
]
print("Running asynchronously...")
async_start = time.time()
results = asyncio.run(async_pull_all([goog_fin_nyse_url + ticker + ':NYSE' for ticker in tickers]))
async_end = time.time()
print(f"Time lapsed is: {async_end - async_start}")
print("Running synchronously...")
sync_start = time.time()
for ticker in tickers:
url = goog_fin_nyse_url + ticker + ':NYSE'
sync_pull(url)
sync_end = time.time()
print(f"Time lapsed is: {sync_end - sync_start}")以这种方式运行,对我来说,异步版本运行大约一秒钟,而不是同步运行七秒钟。
发布于 2021-08-09 16:10:41
下面是我使用的一个很好的模式(每次我都会稍微修改一下)。通常,我创建一个模块async_utils.py,然后只导入顶级的获取函数(例如这里的fetch_things),然后我的代码就可以自由地忽略内部(除了错误处理)。你可以用其他的方式来做,但是我喜欢‘函数式’风格的aiostream,并且经常发现对流程函数的重复调用采用了我使用functools.partial设置的某些默认值。
您可以将tqdm.tqdm进度条传递给pbar (使用已知大小的total=len(things)进行初始化),以便在处理每个异步响应时进行更新。
import asyncio
import httpx
from aiostream import stream
from functools import partial
__all__ = ["fetch", "process", "async_fetch_urlset", "fetch_things"]
async def fetch(session, url, raise_for_status=False):
response = await session.get(str(url))
if raise_for_status:
response.raise_for_status()
return response
async def process_thing(data, things, pbar=None, verbose=False):
# Map the response back to the thing it came from in the things list
source_url = data.history[0].url if data.history else data.url
thing = next(t for t in things if source_url == t.get("thing_url"))
# Handle `data.content` here, where `data` is the `httpx.Response`
if verbose:
print(f"Processing {source_url=}")
build.update({"computed_value": "result goes here"})
if pbar:
pbar.update()
async def async_fetch_urlset(urls, things, pbar=None, verbose=False, timeout_s=10.0):
timeout = httpx.Timeout(timeout=timeout_s)
async with httpx.AsyncClient(timeout=timeout) as session:
ws = stream.repeat(session)
xs = stream.zip(ws, stream.iterate(urls))
ys = stream.starmap(xs, fetch, ordered=False, task_limit=20)
process = partial(process_thing, things=things, pbar=pbar, verbose=verbose)
zs = stream.map(ys, process)
return await zs
def fetch_things(urls, things, pbar=None, verbose=False):
return asyncio.run(async_fetch_urlset(urls, things, pbar, verbose))在本例中,输入是一个dict列表(带有字符串键和值)、URL,并访问键"thing_url"以检索things: list[dict[str,str]]。当你想要将结果‘映射’回它所来自的对象时,最好有一个字典或对象,而不仅仅是URL字符串。process_thing函数能够就地修改输入列表things (即,任何更改都不在函数的作用域内,而是在调用它的作用域中更改)。
您经常会发现在异步运行期间出现的错误,而这些错误在同步运行时是得不到的,因此您需要捕获这些错误,然后重试。一个常见的陷阱是在错误的级别重试(例如在整个循环中重试)
具体地说,您需要导入并捕获httpcore.ConnectTimeout、httpx.ConnectTimeout、httpx.RemoteProtocolError和httpx.ReadTimeout。
增加timeout_s参数将通过让AsyncClient‘等待’更长时间来减少超时错误的频率,但这样做实际上可能会减慢您的程序(它不会那么快地‘失败’)。
下面是如何使用上面给出的async_utils模块的示例:
from async_utils import fetch_things
import httpx
import httpcore
# UNCOMMENT THIS TO SEE ALL THE HTTPX INTERNAL LOGGING
#import logging
#log = logging.getLogger()
#log.setLevel(logging.DEBUG)
#log_format = logging.Formatter('[%(asctime)s] [%(levelname)s] - %(message)s')
#console = logging.StreamHandler()
#console.setLevel(logging.DEBUG)
#console.setFormatter(log_format)
#log.addHandler(console)
things = [
{"url": "https://python.org", "name": "Python"},
{"url": "https://www.python-httpx.org/", "name": "HTTPX"},
]
#log.debug("URLSET:" + str(list(t.get("url") for t in things)))
def make_urlset(things):
"""Make a URL generator (empty if all have been fetched)"""
urlset = (t.get("url") for t in things if "computed_value" not in t)
return urlset
retryable_errors = (
httpcore.ConnectTimeout,
httpx.ConnectTimeout, httpx.RemoteProtocolError, httpx.ReadTimeout,
)
# ASYNCHRONOUS:
max_retries = 100
for i in range(max_retries):
print(f"Retry {i}")
try:
urlset = make_urlset(things)
foo = fetch_things(urls=urlset, things=things, verbose=True)
except retryable_errors as exc:
print(f"Caught {exc!r}")
if i == max_retries - 1:
raise
except Exception:
raise
# SYNCHRONOUS:
#for t in things:
# resp = httpx.get(t["url"])在本例中,一旦异步响应被成功处理,我就在字典上设置了一个密钥"computed_value",这样就可以防止在下一轮(再次调用make_urlset时)将该URL输入生成器。这样,生成器就会变得越来越小。你也可以用列表来做这件事,但是我发现一个URL生成器可以可靠地工作。对于对象,您需要将字典键分配/访问(update/in)更改为属性分配/访问(settatr/hasattr)。
发布于 2021-05-28 04:39:19
我想使用futures发布代码的工作版本--实际上是相同的运行时:
import httpx
import asyncio
import time
#
#--------------------------------------------------------------------
# Synchronous pull
#--------------------------------------------------------------------
#
def sync_pull(url):
r = httpx.get(url)
print(r.status_code)
#
#--------------------------------------------------------------------
# Asynchronous Pull
#--------------------------------------------------------------------
#
async def async_pull(url):
async with httpx.AsyncClient() as client:
r = await client.get(url)
print(r.status_code)
#
#--------------------------------------------------------------------
# Build tasks queue & execute coroutines
#--------------------------------------------------------------------
#
async def build_task() -> None:
goog_fin_nyse_url = 'https://www.google.com/finance/quote/'
tickers = ['F', 'TWTR', 'CVX', 'VZ', 'GME', 'GM', 'PG', 'AAL',
'MARK', 'AAP', 'THO', 'NGD', 'ZSAN', 'SEAC',
]
tasks= []
#
## Following block of code will create a queue full of function
## call
for ticker in tickers:
url = goog_fin_nyse_url + ticker + ':NYSE'
tasks.append(asyncio.ensure_future(async_pull(url)))
start_time = time.time()
#
## This block of code will derefernce the function calls
## from the queue, which will cause them all to run
## rapidly
await asyncio.gather(*tasks)
#
## Calculate time lapsed
finish_time = time.time()
elapsed_time = finish_time - start_time
print(f"\n Time spent processing: {elapsed_time} ")
# Start from here
if __name__ == "__main__":
asyncio.run(build_task())https://stackoverflow.com/questions/67713274
复制相似问题