我对asyncio还是个新手,还在为如何处理循环中的循环而苦苦挣扎:
import asyncio
import concurrent.futures
import logging
import sys
import time
sub_dict = {
1: ['one', 'commodore', 'apple', 'linux', 'windows'],
2: ['two', 'commodore', 'apple', 'linux', 'windows'],
3: ['three', 'commodore', 'apple', 'linux', 'windows'],
4: ['four', 'commodore', 'apple', 'linux', 'windows'],
5: ['five', 'commodore', 'apple', 'linux', 'windows'],
6: ['six', 'commodore', 'apple', 'linux', 'windows'],
7: ['seven', 'commodore', 'apple', 'linux', 'windows'],
8: ['eight', 'commodore', 'apple', 'linux', 'windows']
}
def blocks(key, value):
for v in value:
log = logging.getLogger('blocks({} {})'.format(key, v))
log.info('running')
log.info('done')
time.sleep(5)
return key, v
async def run_blocking_tasks(executor, sub_dict2):
log = logging.getLogger('run_blocking_tasks')
log.info('starting')
log.info('creating executor tasks')
loop = asyncio.get_event_loop()
blocking_tasks = [
loop.run_in_executor(executor, blocks, key, value)
for key, value in sub_dict2.items()
]
log.info('waiting for executor tasks')
completed, pending = await asyncio.wait(blocking_tasks)
results = [t.result() for t in completed]
log.info('results: {!r}'.format(results))
log.info('exiting')
def new_func():
logging.basicConfig(
level=logging.INFO,
format='%(threadName)10s %(name)18s: %(message)s',
stream=sys.stderr,
)
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=8,
)
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(
run_blocking_tasks(executor, sub_dict)
)
event_loop.close()
new_func()在这里,您可以看到每个元素的所有值项都被分配到同一线程。例如,元素“1”的所有值都在线程0上。
我知道这是因为我的for v in value循环没有正确地插入asyncio。
我想要的输出是,如果我分配了五个工作线程,元素'1‘的每个值项将在它自己的线程上,编号为0-4,总共给出五个线程。然后,对元素2到元素8重复此操作。
我应该分配40个线程,8个字典元素*每个元素5个值项=每个字典项1个唯一线程。
希望这是合理的..。
发布于 2020-05-12 23:27:29
一些关于问问题的事情似乎总是会在我身上触发额外的智商。答案是这样的,如果有人感兴趣的话:
def blocks(key, v):
#for v in value:
log = logging.getLogger('blocks({} {})'.format(key,v))
log.info('running')
log.info('done')
time.sleep(30)
return v
async def run_blocking_tasks(executor, sub_dict2):
log = logging.getLogger('run_blocking_tasks')
log.info('starting')
log.info('creating executor tasks')
for key, value in sub_dict2.items():
loop = asyncio.get_event_loop()
blocking_tasks = [
loop.run_in_executor(executor, blocks, key, v)
for v in value
]
log.info('waiting for executor tasks')
completed, pending = await asyncio.wait(blocking_tasks)
results = [t.result() for t in completed]
log.info('results: {!r}'.format(results))
log.info('exiting')
def new_func():
logging.basicConfig(
level=logging.INFO,
format='%(threadName)10s %(name)18s: %(message)s',
stream=sys.stderr,
)
sub_dict2 = dict(list(sub_dict.items())[0:8])
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=5,
)
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(
run_blocking_tasks(executor, sub_dict2)
)
event_loop.close()
new_func()编辑:
下面是我现在的concurrent.futures代码的大致布局,如下面的评论线程所示。是一个逻辑顺序的布局,而不是完整的代码,因为这有几个前步骤的功能相当冗长……
#some code here that chunks a bigger dictionary using slicing in a for loop.
#sub_dict a 20 element subset of a bigger dictionary
#slices are parameterised in real code
sub_dict = dict(list(fin_dict.items())[0:20])
#set 20 workers
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
#submit to the executor an enumerator/counter and my sub dict
future_to_pair = {executor.submit(function_name, v, i): (i, v) for i, v in enumerate(sub_dict.items(), 1)}
#await results
for future in concurrent.futures.as_completed(future_to_pair):
pair = future_to_pair[future]
data = future.result()
#function that is being called by concurrent.futures
#am happy for all the v's in value to be on a single thread
#i want each key to be on an individual thread
#this will process 20 keys simultaneously, but wait for the slowest one before clearing
def function_name(sub_dict, i):
for key, value in sub_dict:
for v in value:
# using subprocess, execute some stuff
# dictionary loops provide parameters for the executables.发布于 2020-05-14 09:25:38
我认为你错过了一个关键的概念:等待。没有等待的异步def函数是完全合法的,但却是没有意义的。异步编程的目的是处理这样的情况:您的程序必须等待某些东西,而程序在此期间可以做一些有用的事情。否则,它的实用性很小。*也不容易找到简单的例子来说明这是多么有用。
Python提供了几种类型的并发:进程,它使用多个CPU核心;线程,它在单个进程中使用多行执行;以及异步任务,它在线程中使用多个执行单元。它们可以以不同的方式组合在一起,并具有不同的特征。
线程允许您的程序在等待资源时在一个地方阻塞,但在另一个地方继续执行。但是线程之间的同步通常很棘手,因为在线程之间调度CPU时间是抢占式的。它不在你的直接控制之下。
任务也允许你的程序在一个地方停止,在另一个地方继续,但是在任务之间的切换是合作的。一切都在你的掌控之中。当一个任务遇到"await“表达式时,它会停在那里,并允许另一个任务运行。该任务将继续执行,直到遇到等待表达式,依此类推。如果这解决了你的问题,那就太好了。这是一个很棒的工具。
似乎,通过阅读大量的SO问题,程序员有时会得到这样的印象: asyncio会让他们的程序运行得更快,因为他们会把程序送到某种永远不会消耗CPU周期的地方,在那里执行,结果就会轻而易举地回来。抱歉,那不会发生的。主要的用例就是我所描述的:你必须等待一些事情,但你还有其他的事情要做。
*为了完整性添加了备注:我使用了asyncio的跨线程功能作为线程之间协调的手段。例如,在线程B中创建一个事件循环,并使其使用"call_soon_threadsafe“方法或"run_coroutine_threadsafe”方法根据线程A的需要执行函数。这是一个很方便的功能,即使它不需要使用await表达式。
https://stackoverflow.com/questions/61755419
复制相似问题