首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Asyncio和TQDM Python

Asyncio和TQDM Python
EN

Stack Overflow用户
提问于 2022-09-13 02:02:40
回答 1查看 197关注 0票数 0

我正在尝试为异步函数使用tqdm来制作一个进度条。我在以下网站上试着遵循指南:

异步的tqdm带有tqdm的异步tqdm和coroutines

下面是我的代码,它运行在一个jupyter笔记本上:

代码语言:javascript
复制
import pandas as pd
from pandas_datareader import data as pdr
import asyncio
# import tqdm
from tqdm.asyncio import tqdm


async def get_prices(index, row):
    try:
        prices = pdr.get_data_yahoo(row['Symbol'], row['Start'], row['End'])
    except Exception as e:
        print('Error',e,row['Symbol'],row['Date'])
        return
    prices['Symbol'] = row['Symbol']
    prices['Key'] = index
    return prices

async def get_stock_data(df):
    # data = await asyncio.gather(*[get_prices(index, row) for index, row in df.iterrows()])
    # data = [await f for f in tqdm(asyncio.as_completed([get_prices(index, row) for index, row in df.iterrows()]), total=len(df)]
    flist = [get_prices(index, row) for index, row in df.iterrows()]
    data = [await f for f in tqdm.as_completed(flist, total=len(df))]
    return data


stocks = ['IBM', 'AAPL', 'C', 'ACTG', 'ACVA', 'ACWI', 'ACWX', 'ACXP', 'ADAG', 'ADAL', 'IBET', 'IBEX', 'IBKR', 'IBOC', 'IBRX', 'IBTB', 'IBTD', 'IBTE', 'IBTF', 'IBTG', 'IBTH', 'IBTI', 'IBTJ', 'IBTK', 'IBTL', 'IBTM', 'IBTX', 'ICAD', 'ICCC', 'ICCH', 'ICCM', 'ICFI', 'ICHR', 'ICLK', 'ICLN', 'ICLR', 'ICMB', 'ICPT', 'ICUI', 'ICVX', 'IDAI', 'IDBA', 'IDCC', 'IDEX', 'IDLB', 'IDN', 'IDRA', 'IDXX', 'IDYA', 'IEA', 'IEAWW', 'IEF', 'IEI', 'IEP', 'IESC', 'IEUS', 'IFBD', 'IFGL', 'IFRX', 'IFV', 'IGAC', 'IGACU', 'IGACW', 'IGF', 'IGIB', 'IGIC', 'IGICW', 'IGMS', 'IGNY', 'IGNYU', 'IGNYW', 'IGOV', 'IGSB', 'IGTA', 'IGTAR', 'IGTAU', 'IGTAW', 'IHRT', 'IHYF', 'III', 'IIII', 'IIIIU', 'IIIIW', 'IIIV', 'IINN', 'IINNW', 'IIVI', 'IIVIP', 'IJT', 'IKNA', 'IKT', 'ILAG', 'ILMN', 'ILPT', 'IMAB', 'IMAC', 'IMACW', 'IMAQ', 'IMAQR', 'IMAQU', 'IMAQW', 'IMBI', 'IMBIL', 'IMCC', 'IMCR', 'IMCV', 'IMGN', 'IMGO', 'IMKTA', 'IMMP', 'IMMR', 'IMMX', 'IMNM', 'IMOS', 'IMPL', 'IMPP', 'IMPPP', 'IMRA', 'IMRN', 'IMRX', 'IMTE', 'IMTX', 'IMTXW', 'IMUX', 'IMV', 'IMVT', 'IMXI', 'INAB', 'INBK', 'INBKZ', 'INBX', 'INCR', 'INCY', 'INDB', 'INDI', 'INDIW', 'INDP', 'INDT', 'INDY', 'INFI', 'INFN', 'INGN', 'INKA', 'INKAU', 'INKAW', 'INKT', 'INM', 'INMB', 'INMD', 'INNV', 'INO', 'INOD', 'INPX', 'INSE', 'INSG', 'INSM', 'INTA', 'INTC', 'INTE', 'INTEU', 'INTEW', 'INTG', 'INTR', 'INTU', 'INTZ', 'INVA', 'INVE', 'INVO', 'INVZ', 'INVZW', 'INZY', 'IOAC', 'IOACU', 'IOACW', 'IOBT', 'IONM', 'IONR', 'IONS', 'IOSP', 'IOVA', 'IPA', 'IPAR', 'IPAX', 'IPAXU', 'IPAXW', 'IPDN', 'IPGP', 'IPHA', 'IPKW', 'IPSC', 'IPVI', 'IPVIU', 'IPVIW', 'IPW', 'IPWR', 'IPX', 'IQ', 'IQMD', 'IQMDU', 'IQMDW', 'IRAA', 'IRAAU', 'IRAAW', 'IRBT', 'IRDM', 'IREN', 'IRIX', 'IRMD', 'IROQ', 'IRTC', 'IRWD', 'ISAA', 'ISDX', 'ISEE']  
df_stocks = pd.DataFrame(stocks, columns=['Symbol'])
df_stocks['Start'] = '9/1/2022'
df_stocks['End'] = '9/11/2022'

data = pd.concat([d for d in await get_stock_data(df_stocks)])
data.dropna(inplace=True)
data.to_csv('../output/stockprices.csv', sep='\t')
data

代码工作,但进度条不工作。我得到的输出是不变的:

代码语言:javascript
复制
0%|          | 0/214 [00:00<?, ?it/s]

我也尝试过from tqdm.autonotebook import tqdm,但结果是一样的。

我肯定我在做一些愚蠢的事情,但我自己解决不了这个问题。

EN

回答 1

Stack Overflow用户

发布于 2022-09-13 19:28:05

TL;DR

问题是异步函数get_prices()没有任何https://docs.python.org/3/library/asyncio-task.html#awaitables,因此它不是异步的,而是同步的。在阅读其余部分之前,您肯定希望了解Coroutine是什么。

由于该库pandas-datareader没有定义任何异步函数--您最好将pdr.get_data_xxx卸载到线程或放弃并发性,并使用同步的tqdm。

解释

就问题而言,您所写的内容可以简化如下:

代码语言:javascript
复制
import asyncio
import random
import time
from tqdm.asyncio import tqdm


async def fake_async_task():
    time.sleep(0.5 + random.random())  # <- notice there is no await & awaitable!


async def main():
    tasks = [fake_async_task() for _ in range(10)]
    _ = [await task_ for task_ in tqdm.as_completed(tasks, total=len(tasks))]


# asyncio.run(main()) --> when not in Jupyter
await main() # --> in Jupyter

如果您运行它,您会注意到,在它结束之前,它似乎也会将进度从0%跳到100%,并且执行的总时间要比0.5 +α秒长得多。

代码语言:javascript
复制
  0%|          | 0/10 [00:00<?, ?it/s]

其背后的原因非常复杂;抛出一个假异步函数--也就是不带await的异步函数--超出了文档中的用法,因此需要在库代码中查看。

过分简化流程--以牺牲准确性和恰当的术语为代价:

  1. 我们给tqdm.asyncio.tqdm.as_completed喂食10个tqdm.asyncio.tqdm.as_completed协同素。
  2. tqdm.asyncio.tqdm.as_completed只是asyncio.as_completed的一个包装器,所以tqdm传递给它所有的都是可以的,然后等待任何结果。
  3. asyncio.as_completed调度执行所有给定的Awaitable,,然后调度Awaitable命名为_wait_for_one以获得结果。
  4. 第一个计划好的fake_async_task()开始运行,直到遇到下一个await关键字。
  5. 但是我们还没有添加任何await关键字,所以coroutine最终会不停地运行。
  6. 其他9个计划好的fake_async_task()也会发生同样的情况,而_wait_for_one仍在耐心地等待轮到它。
  7. 当最终轮到_wait_for_one时,所有的任务都已经完成了,因此产生的结果发生得比人类所能看到的要快,进度条的进度变化也是如此。

这就是为什么总执行时间是addictive的原因,它在执行过程中从未真正存档任何并发性。

运行像fake_async_task()这样的函数就是tqdm作者和asyncio都没有想到的。其他人通常会编写类似这样的代码:

代码语言:javascript
复制
import asyncio
import random
from tqdm.asyncio import tqdm


async def task():
    await asyncio.sleep(0.5 + random.random()) # <- await + something that's awaitable
    # adding random val to prevent it finishing altogether,
    # or progress bar will seemingly jump from 0 to 100 again.


async def main():
    tasks = [task() for _ in range(10)]
    _ = [await task_ for task_ in tqdm.as_completed(tasks, total=len(tasks))]


# asyncio.run(main())
await main()

然后打印出进度--更像是,足够慢,这样我们就能看到--就像我们想要的那样。同时,总执行时间为0.5 +α秒,实现了适当的并发性。

代码语言:javascript
复制
 30%|███       | 3/10 [00:00<00:01,  3.98it/s]

备选方案

但是,如果您想要使用的函数碰巧没有async变体,但是它是很高兴的不是CPU密集型,而是I/O密集型,那么您可以在使用异步API时将它卸载到另一个线程以实现并发。

代码语言:javascript
复制
import asyncio
import random
import time
from tqdm.asyncio import tqdm


def io_intensive_sync_task():
    time.sleep(0.5 + random.random())

async def main():
    tasks = [asyncio.to_thread(io_intensive_sync_task) for _ in range(10)]
    _ = [await task_ for task_ in tqdm.as_completed(tasks, total=len(tasks))]
    # can skip the total param as tqdm internally use len if not provided


# asyncio.run(main())
await main()

它将像前面的示例一样运行。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73696826

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档