我正在尝试为异步函数使用tqdm来制作一个进度条。我在以下网站上试着遵循指南:
异步的tqdm、带有tqdm的异步和tqdm和coroutines
下面是我的代码,它运行在一个jupyter笔记本上:
import pandas as pd
from pandas_datareader import data as pdr
import asyncio
# import tqdm
from tqdm.asyncio import tqdm
async def get_prices(index, row):
try:
prices = pdr.get_data_yahoo(row['Symbol'], row['Start'], row['End'])
except Exception as e:
print('Error',e,row['Symbol'],row['Date'])
return
prices['Symbol'] = row['Symbol']
prices['Key'] = index
return prices
async def get_stock_data(df):
# data = await asyncio.gather(*[get_prices(index, row) for index, row in df.iterrows()])
# data = [await f for f in tqdm(asyncio.as_completed([get_prices(index, row) for index, row in df.iterrows()]), total=len(df)]
flist = [get_prices(index, row) for index, row in df.iterrows()]
data = [await f for f in tqdm.as_completed(flist, total=len(df))]
return data
stocks = ['IBM', 'AAPL', 'C', 'ACTG', 'ACVA', 'ACWI', 'ACWX', 'ACXP', 'ADAG', 'ADAL', 'IBET', 'IBEX', 'IBKR', 'IBOC', 'IBRX', 'IBTB', 'IBTD', 'IBTE', 'IBTF', 'IBTG', 'IBTH', 'IBTI', 'IBTJ', 'IBTK', 'IBTL', 'IBTM', 'IBTX', 'ICAD', 'ICCC', 'ICCH', 'ICCM', 'ICFI', 'ICHR', 'ICLK', 'ICLN', 'ICLR', 'ICMB', 'ICPT', 'ICUI', 'ICVX', 'IDAI', 'IDBA', 'IDCC', 'IDEX', 'IDLB', 'IDN', 'IDRA', 'IDXX', 'IDYA', 'IEA', 'IEAWW', 'IEF', 'IEI', 'IEP', 'IESC', 'IEUS', 'IFBD', 'IFGL', 'IFRX', 'IFV', 'IGAC', 'IGACU', 'IGACW', 'IGF', 'IGIB', 'IGIC', 'IGICW', 'IGMS', 'IGNY', 'IGNYU', 'IGNYW', 'IGOV', 'IGSB', 'IGTA', 'IGTAR', 'IGTAU', 'IGTAW', 'IHRT', 'IHYF', 'III', 'IIII', 'IIIIU', 'IIIIW', 'IIIV', 'IINN', 'IINNW', 'IIVI', 'IIVIP', 'IJT', 'IKNA', 'IKT', 'ILAG', 'ILMN', 'ILPT', 'IMAB', 'IMAC', 'IMACW', 'IMAQ', 'IMAQR', 'IMAQU', 'IMAQW', 'IMBI', 'IMBIL', 'IMCC', 'IMCR', 'IMCV', 'IMGN', 'IMGO', 'IMKTA', 'IMMP', 'IMMR', 'IMMX', 'IMNM', 'IMOS', 'IMPL', 'IMPP', 'IMPPP', 'IMRA', 'IMRN', 'IMRX', 'IMTE', 'IMTX', 'IMTXW', 'IMUX', 'IMV', 'IMVT', 'IMXI', 'INAB', 'INBK', 'INBKZ', 'INBX', 'INCR', 'INCY', 'INDB', 'INDI', 'INDIW', 'INDP', 'INDT', 'INDY', 'INFI', 'INFN', 'INGN', 'INKA', 'INKAU', 'INKAW', 'INKT', 'INM', 'INMB', 'INMD', 'INNV', 'INO', 'INOD', 'INPX', 'INSE', 'INSG', 'INSM', 'INTA', 'INTC', 'INTE', 'INTEU', 'INTEW', 'INTG', 'INTR', 'INTU', 'INTZ', 'INVA', 'INVE', 'INVO', 'INVZ', 'INVZW', 'INZY', 'IOAC', 'IOACU', 'IOACW', 'IOBT', 'IONM', 'IONR', 'IONS', 'IOSP', 'IOVA', 'IPA', 'IPAR', 'IPAX', 'IPAXU', 'IPAXW', 'IPDN', 'IPGP', 'IPHA', 'IPKW', 'IPSC', 'IPVI', 'IPVIU', 'IPVIW', 'IPW', 'IPWR', 'IPX', 'IQ', 'IQMD', 'IQMDU', 'IQMDW', 'IRAA', 'IRAAU', 'IRAAW', 'IRBT', 'IRDM', 'IREN', 'IRIX', 'IRMD', 'IROQ', 'IRTC', 'IRWD', 'ISAA', 'ISDX', 'ISEE']
df_stocks = pd.DataFrame(stocks, columns=['Symbol'])
df_stocks['Start'] = '9/1/2022'
df_stocks['End'] = '9/11/2022'
data = pd.concat([d for d in await get_stock_data(df_stocks)])
data.dropna(inplace=True)
data.to_csv('../output/stockprices.csv', sep='\t')
data代码工作,但进度条不工作。我得到的输出是不变的:
0%| | 0/214 [00:00<?, ?it/s]我也尝试过from tqdm.autonotebook import tqdm,但结果是一样的。
我肯定我在做一些愚蠢的事情,但我自己解决不了这个问题。
发布于 2022-09-13 19:28:05
TL;DR
问题是异步函数get_prices()没有任何https://docs.python.org/3/library/asyncio-task.html#awaitables,因此它不是异步的,而是同步的。在阅读其余部分之前,您肯定希望了解Coroutine是什么。
由于该库pandas-datareader没有定义任何异步函数--您最好将pdr.get_data_xxx卸载到线程或放弃并发性,并使用同步的tqdm。
解释
就问题而言,您所写的内容可以简化如下:
import asyncio
import random
import time
from tqdm.asyncio import tqdm
async def fake_async_task():
time.sleep(0.5 + random.random()) # <- notice there is no await & awaitable!
async def main():
tasks = [fake_async_task() for _ in range(10)]
_ = [await task_ for task_ in tqdm.as_completed(tasks, total=len(tasks))]
# asyncio.run(main()) --> when not in Jupyter
await main() # --> in Jupyter如果您运行它,您会注意到,在它结束之前,它似乎也会将进度从0%跳到100%,并且执行的总时间要比0.5 +α秒长得多。
0%| | 0/10 [00:00<?, ?it/s]其背后的原因非常复杂;抛出一个假异步函数--也就是不带await的异步函数--超出了文档中的用法,因此需要在库代码中查看。
过分简化流程--以牺牲准确性和恰当的术语为代价:
tqdm.asyncio.tqdm.as_completed喂食10个tqdm.asyncio.tqdm.as_completed协同素。tqdm.asyncio.tqdm.as_completed只是asyncio.as_completed的一个包装器,所以tqdm传递给它所有的都是可以的,然后等待任何结果。asyncio.as_completed调度执行所有给定的Awaitable,,然后调度Awaitable命名为_wait_for_one以获得结果。fake_async_task()开始运行,直到遇到下一个await关键字。await关键字,所以coroutine最终会不停地运行。fake_async_task()也会发生同样的情况,而_wait_for_one仍在耐心地等待轮到它。_wait_for_one时,所有的任务都已经完成了,因此产生的结果发生得比人类所能看到的要快,进度条的进度变化也是如此。这就是为什么总执行时间是addictive的原因,它在执行过程中从未真正存档任何并发性。
运行像fake_async_task()这样的函数就是tqdm作者和asyncio都没有想到的。其他人通常会编写类似这样的代码:
import asyncio
import random
from tqdm.asyncio import tqdm
async def task():
await asyncio.sleep(0.5 + random.random()) # <- await + something that's awaitable
# adding random val to prevent it finishing altogether,
# or progress bar will seemingly jump from 0 to 100 again.
async def main():
tasks = [task() for _ in range(10)]
_ = [await task_ for task_ in tqdm.as_completed(tasks, total=len(tasks))]
# asyncio.run(main())
await main()然后打印出进度--更像是,足够慢,这样我们就能看到--就像我们想要的那样。同时,总执行时间为0.5 +α秒,实现了适当的并发性。
30%|███ | 3/10 [00:00<00:01, 3.98it/s]备选方案
但是,如果您想要使用的函数碰巧没有async变体,但是它是很高兴的不是CPU密集型,而是I/O密集型,那么您可以在使用异步API时将它卸载到另一个线程以实现并发。
import asyncio
import random
import time
from tqdm.asyncio import tqdm
def io_intensive_sync_task():
time.sleep(0.5 + random.random())
async def main():
tasks = [asyncio.to_thread(io_intensive_sync_task) for _ in range(10)]
_ = [await task_ for task_ in tqdm.as_completed(tasks, total=len(tasks))]
# can skip the total param as tqdm internally use len if not provided
# asyncio.run(main())
await main()它将像前面的示例一样运行。
https://stackoverflow.com/questions/73696826
复制相似问题