我正在运行一个程序,它对一个dataframe中的每一行进行相同的API调用。由于花费了相当长的时间,我决定尝试学习并实现一个使用asyincio的异步版本。
我将数据分割成"N“(在本例中为3),更小的数据格式,而对于每一个数据,我将创建一个协同线,将由主例程收集和等待。
以下是我尝试过的:
async def get_next_funding(df):
for idx in df.index:
predicted_rate = ftx.get_future_stats(df.loc[idx]['name'])['nextFundingRate']
df.at[idx, 'nextFundingRate'] = predicted_rate
await asyncio.sleep(0.01)
return df
async def await_for_df(df_):
# await é quem promete que a funcao vai ser executada, dando a mesma
# para o event loop
await get_next_funding(df_)
return df_
async def main():
# array_split returns a LIST of dataframes: [df1, ..., dfN]
dfs = np.array_split(ftx.df, 3)
# Could I use [await_for_df(dfs[x]) for x in dfs] ?
results = await asyncio.gather(await_for_df(dfs[0]), await_for_df(dfs[1]), await_for_df(dfs[2]))
loop = asyncio.get_event_loop()
start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print('process finished in {} seconds'.format(end - start))它可以工作,但它似乎没有并行运行,因为它需要与我的同步代码相同的时间。我觉得函数ftx.get_future_stats()可能阻塞了一切。这样的函数是一个标准的API调用( https://docs.ftx.com/#get-future-stats )。
我错过了什么?
发布于 2022-01-12 13:33:53
因此,我需要将阻塞函数异步get_next_funding(df)作为loop.run_in_executor()内部的同步函数来执行,因为阻塞fcn不是异步类型。
谢谢@gold_cy的回答!
修改后的代码如下:
def get_next_funding(df):
for idx in df.index:
predicted_rate = ftx.get_future_stats(df.loc[idx]['name'])['nextFundingRate']
df.at[idx, 'nextFundingRate'] = predicted_rate
return df
async def await_for_df(df_):
loop = asyncio.get_running_loop()
r = await loop.run_in_executor(None, get_next_funding, df_)
return rhttps://stackoverflow.com/questions/70681195
复制相似问题