我有一个流,当我收到这个流上的消息时,我想运行一个函数。
async def some_func():
asyncio.sleep(5)
print("hello world")
client = create_client('wax.dfuse.eosnation.io:9000')
stream = client.Execute(Request(query = OPERATION_EOS))
for rawRequest in stream:
async.gather(some_func())如果同时有2条或更多条消息,我想要2个或更多并行运行的函数。
当前,此脚本不运行函数。
我只需要一种独立于主函数运行函数的方法。
发布于 2022-08-26 00:36:25
不知怎么我让它起作用了。
我的代码:
async def some_func(rawResult):
# There is some code
async def stream_eosio(loop):
for rawResult in stream:
asyncio.run_coroutine_threadsafe(some_func(rawResult), loop)
if __name__ == "__main__":
loop = asyncio.new_event_loop()
Thread(target=asyncio.run, args=(stream_eosio(loop),)).start()
loop.run_forever()缺点:您不能停止使用Ctrl +Z或Ctrl + C的脚本,因为线程。
优点:有点ez。
发布于 2022-08-23 22:16:25
代码示例:
import asyncio
import time
chain = ""
sum = 0
async def myproc(callid):
global chain
global sum
print(f"myProc {callid} started ...")
t1 = time.perf_counter()
time.sleep(2.5)
chain = chain + "->" + str(callid)
sum = sum + 1
await asyncio.sleep(5)
print("hello world")
t = time.perf_counter() - t1
print(f" myProc {callid} finished in {t:0.5f} seconds. sum = {sum} chain {chain}")
async def main():
#client = create_client('wax.dfuse.eosnation.io:9000')
#stream = client.Execute(Request(query = OPERATION_EOS))
stream = range(10) # # simulation of the task aka each eelment from stream
coros = [myproc(rawRequest) for rawRequest in stream]
await asyncio.gather(*coros)
if __name__ == "__main__":
start_sec = time.perf_counter()
await main() # # for notebook how does work, for python interpreter use asyncio.run(main())
elapsed_secs = time.perf_counter() - start_sec
print(f"Job finished in {elapsed_secs:0.5f} seconds.")输出:
myProc 0 started ...
myProc 1 started ...
myProc 2 started ...
myProc 3 started ...
myProc 4 started ...
myProc 5 started ...
myProc 6 started ...
myProc 7 started ...
myProc 8 started ...
myProc 9 started ...
hello world
myProc 0 finished in 25.02580 seconds. sum = 10 chain ->0->1->2->3->4->5->6->7->8->9
hello world
myProc 1 finished in 22.52303 seconds. sum = 10 chain ->0->1->2->3->4->5->6->7->8->9
hello world
myProc 2 finished in 20.02011 seconds. sum = 10 chain ->0->1->2->3->4->5->6->7->8->9
hello world
myProc 3 finished in 17.51737 seconds. sum = 10 chain ->0->1->2->3->4->5->6->7->8->9
hello world
myProc 4 finished in 15.01457 seconds. sum = 10 chain ->0->1->2->3->4->5->6->7->8->9
hello world
myProc 5 finished in 12.51187 seconds. sum = 10 chain ->0->1->2->3->4->5->6->7->8->9
hello world
myProc 6 finished in 10.00907 seconds. sum = 10 chain ->0->1->2->3->4->5->6->7->8->9
hello world
myProc 7 finished in 7.50854 seconds. sum = 10 chain ->0->1->2->3->4->5->6->7->8->9
hello world
myProc 8 finished in 7.50605 seconds. sum = 10 chain ->0->1->2->3->4->5->6->7->8->9
hello world
myProc 9 finished in 7.50515 seconds. sum = 10 chain ->0->1->2->3->4->5->6->7->8->9
Job finished in 30.02882 seconds.要获得详细的解释,您可以查看以下非常好的解释,说明程序中各个函数的异步执行,而不是并行化处理,因此,与使用不高效的线程并行执行不同,您可以利用一个组合,而asyncio.gather(*coros)不需要定义线程并增加基础设施Python - Make数据管道尖叫中的异步处理就可以以并行格式运行它。考虑使用asyncio.run()函数而不是使用较低级别的函数手动创建和关闭事件循环,我在注释中确实指出了这一点,但是对于我来说是额外的循环,对于我来说,在默认情况下已经运行的笔记本中,高级API用于协同线 run 更好地处理所有和需要使用“事件循环”格式的执行这些协同值,而不是简单地调用coroutines ()(这是对更好地理解所有这些API的一点无意义的解释)。
注意:我确实使用来执行它,所以如果您确实使用python解释器,那么就使用asyncio.run( main() )代替等待main(),我们在这里使用异步处理来模拟并行处理,而不是执行通常很难完成的真正的并行处理,并且不适合您的流作业。
发布于 2022-08-21 01:30:44
用穿线你可以做到
import threading
thread1 = threading.Thread(target=fn)
thread2 = threading.Thread(target=fn)
thread1.start()
thread2.start()
thread1.join()
thread2.join()https://stackoverflow.com/questions/73431087
复制相似问题