首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用paralel Python运行函数

用paralel Python运行函数
EN

Stack Overflow用户
提问于 2022-08-21 01:19:23
回答 3查看 74关注 0票数 0

我有一个流,当我收到这个流上的消息时,我想运行一个函数。

代码语言:javascript
复制
  async def some_func():
    asyncio.sleep(5)
    print("hello world")

  client = create_client('wax.dfuse.eosnation.io:9000')
  stream = client.Execute(Request(query = OPERATION_EOS))

  for rawRequest in stream:
    async.gather(some_func())

如果同时有2条或更多条消息,我想要2个或更多并行运行的函数。

当前,此脚本不运行函数。

我只需要一种独立于主函数运行函数的方法。

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2022-08-26 00:36:25

不知怎么我让它起作用了。

我的代码:

代码语言:javascript
复制
async def some_func(rawResult):
  # There is some code

async def stream_eosio(loop):
  for rawResult in stream:
    asyncio.run_coroutine_threadsafe(some_func(rawResult), loop)

if __name__ == "__main__":
  loop = asyncio.new_event_loop()
  Thread(target=asyncio.run, args=(stream_eosio(loop),)).start()
  loop.run_forever()

缺点:您不能停止使用Ctrl +Z或Ctrl + C的脚本,因为线程。

优点:有点ez。

票数 0
EN

Stack Overflow用户

发布于 2022-08-23 22:16:25

代码示例:

代码语言:javascript
复制
import asyncio
import time
chain = ""
sum = 0

async def myproc(callid):
    global chain
    global sum
    print(f"myProc {callid} started ...")
    t1 = time.perf_counter()
    time.sleep(2.5)
    chain = chain + "->" + str(callid)
    sum = sum + 1
    await asyncio.sleep(5)
    print("hello world")
    t = time.perf_counter() - t1
    print(f"   myProc {callid} finished in {t:0.5f} seconds. sum = {sum} chain {chain}")

async def main():
    #client = create_client('wax.dfuse.eosnation.io:9000')
    #stream = client.Execute(Request(query = OPERATION_EOS))
    stream = range(10) # # simulation of the task aka each eelment from stream
    coros = [myproc(rawRequest) for rawRequest in stream]    
    await asyncio.gather(*coros)
    
if __name__ == "__main__":
    start_sec = time.perf_counter()
    await main() # # for notebook how does work, for python interpreter use asyncio.run(main())
    
    elapsed_secs = time.perf_counter() - start_sec
    print(f"Job finished in {elapsed_secs:0.5f} seconds.")

输出:

代码语言:javascript
复制
myProc 0 started ...
myProc 1 started ...
myProc 2 started ...
myProc 3 started ...
myProc 4 started ...
myProc 5 started ...
myProc 6 started ...
myProc 7 started ...
myProc 8 started ...
myProc 9 started ...
hello world
   myProc 0 finished in 25.02580 seconds. sum = 10 chain ->0->1->2->3->4->5->6->7->8->9
hello world
   myProc 1 finished in 22.52303 seconds. sum = 10 chain ->0->1->2->3->4->5->6->7->8->9
hello world
   myProc 2 finished in 20.02011 seconds. sum = 10 chain ->0->1->2->3->4->5->6->7->8->9
hello world
   myProc 3 finished in 17.51737 seconds. sum = 10 chain ->0->1->2->3->4->5->6->7->8->9
hello world
   myProc 4 finished in 15.01457 seconds. sum = 10 chain ->0->1->2->3->4->5->6->7->8->9
hello world
   myProc 5 finished in 12.51187 seconds. sum = 10 chain ->0->1->2->3->4->5->6->7->8->9
hello world
   myProc 6 finished in 10.00907 seconds. sum = 10 chain ->0->1->2->3->4->5->6->7->8->9
hello world
   myProc 7 finished in 7.50854 seconds. sum = 10 chain ->0->1->2->3->4->5->6->7->8->9
hello world
   myProc 8 finished in 7.50605 seconds. sum = 10 chain ->0->1->2->3->4->5->6->7->8->9
hello world
   myProc 9 finished in 7.50515 seconds. sum = 10 chain ->0->1->2->3->4->5->6->7->8->9
Job finished in 30.02882 seconds.

要获得详细的解释,您可以查看以下非常好的解释,说明程序中各个函数的异步执行,而不是并行化处理,因此,与使用不高效的线程并行执行不同,您可以利用一个组合,而asyncio.gather(*coros)不需要定义线程并增加基础设施Python - Make数据管道尖叫中的异步处理就可以以并行格式运行它。考虑使用asyncio.run()函数而不是使用较低级别的函数手动创建和关闭事件循环,我在注释中确实指出了这一点,但是对于我来说是额外的循环,对于我来说,在默认情况下已经运行的笔记本中,高级API用于协同线 run 更好地处理所有和需要使用“事件循环”格式的执行这些协同值,而不是简单地调用coroutines ()(这是对更好地理解所有这些API的一点无意义的解释)。

注意:我确实使用来执行它,所以如果您确实使用python解释器,那么就使用asyncio.run( main() )代替等待main(),我们在这里使用异步处理来模拟并行处理,而不是执行通常很难完成的真正的并行处理,并且不适合您的流作业。

票数 1
EN

Stack Overflow用户

发布于 2022-08-21 01:30:44

穿线你可以做到

代码语言:javascript
复制
import threading

thread1 = threading.Thread(target=fn)
thread2 = threading.Thread(target=fn)

thread1.start()
thread2.start()

thread1.join()
thread2.join()
票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73431087

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档