首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Python嵌套循环和异步

Python嵌套循环和异步
EN

Stack Overflow用户
提问于 2020-05-12 23:18:10
回答 2查看 478关注 0票数 0

我对asyncio还是个新手,还在为如何处理循环中的循环而苦苦挣扎:

代码语言:javascript
复制
import asyncio
import concurrent.futures
import logging
import sys
import time

sub_dict = {
    1: ['one', 'commodore', 'apple', 'linux', 'windows'],
    2: ['two', 'commodore', 'apple', 'linux', 'windows'],
    3: ['three', 'commodore', 'apple', 'linux', 'windows'],
    4: ['four', 'commodore', 'apple', 'linux', 'windows'],
    5: ['five', 'commodore', 'apple', 'linux', 'windows'],
    6: ['six', 'commodore', 'apple', 'linux', 'windows'],
    7: ['seven', 'commodore', 'apple', 'linux', 'windows'],
    8: ['eight', 'commodore', 'apple', 'linux', 'windows']
}


def blocks(key, value):


    for v in value:

        log = logging.getLogger('blocks({} {})'.format(key, v))
        log.info('running')

        log.info('done')

        time.sleep(5)



    return key, v


async def run_blocking_tasks(executor, sub_dict2):
    log = logging.getLogger('run_blocking_tasks')
    log.info('starting')

    log.info('creating executor tasks')


    loop = asyncio.get_event_loop()
    blocking_tasks = [
        loop.run_in_executor(executor, blocks, key, value)


        for key, value in sub_dict2.items()
    ]
    log.info('waiting for executor tasks')
    completed, pending = await asyncio.wait(blocking_tasks)
    results = [t.result() for t in completed]
    log.info('results: {!r}'.format(results))

    log.info('exiting')


def new_func():


    logging.basicConfig(
        level=logging.INFO,
        format='%(threadName)10s %(name)18s: %(message)s',
        stream=sys.stderr,
    )

    executor = concurrent.futures.ThreadPoolExecutor(
    max_workers=8,
    )

    event_loop = asyncio.get_event_loop()

    event_loop.run_until_complete(
    run_blocking_tasks(executor, sub_dict)
    )

    event_loop.close()


new_func()

在这里,您可以看到每个元素的所有值项都被分配到同一线程。例如,元素“1”的所有值都在线程0上。

我知道这是因为我的for v in value循环没有正确地插入asyncio。

我想要的输出是,如果我分配了五个工作线程,元素'1‘的每个值项将在它自己的线程上,编号为0-4,总共给出五个线程。然后,对元素2到元素8重复此操作。

我应该分配40个线程,8个字典元素*每个元素5个值项=每个字典项1个唯一线程。

希望这是合理的..。

EN

回答 2

Stack Overflow用户

发布于 2020-05-12 23:27:29

一些关于问问题的事情似乎总是会在我身上触发额外的智商。答案是这样的,如果有人感兴趣的话:

代码语言:javascript
复制
def blocks(key, v):


    #for v in value:

    log = logging.getLogger('blocks({} {})'.format(key,v))
    log.info('running')

    log.info('done')

    time.sleep(30)



    return v


async def run_blocking_tasks(executor, sub_dict2):
    log = logging.getLogger('run_blocking_tasks')
    log.info('starting')

    log.info('creating executor tasks')

    for key, value in sub_dict2.items():

        loop = asyncio.get_event_loop()
        blocking_tasks = [
        loop.run_in_executor(executor, blocks, key, v)


        for v in value
        ]

        log.info('waiting for executor tasks')
        completed, pending = await asyncio.wait(blocking_tasks)
        results = [t.result() for t in completed]
        log.info('results: {!r}'.format(results))

        log.info('exiting')


def new_func():


    logging.basicConfig(
        level=logging.INFO,
        format='%(threadName)10s %(name)18s: %(message)s',
        stream=sys.stderr,
    )

    sub_dict2 = dict(list(sub_dict.items())[0:8])

    executor = concurrent.futures.ThreadPoolExecutor(
    max_workers=5,
    )

    event_loop = asyncio.get_event_loop()

    event_loop.run_until_complete(
    run_blocking_tasks(executor, sub_dict2)
    )

    event_loop.close()


new_func()

编辑:

下面是我现在的concurrent.futures代码的大致布局,如下面的评论线程所示。是一个逻辑顺序的布局,而不是完整的代码,因为这有几个前步骤的功能相当冗长……

代码语言:javascript
复制
#some code here that chunks a bigger dictionary using slicing in a for loop.
#sub_dict a 20 element subset of a bigger dictionary
#slices are parameterised in real code

sub_dict = dict(list(fin_dict.items())[0:20])

#set 20 workers                    
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:                 

    #submit to the executor an enumerator/counter and my sub dict
    future_to_pair = {executor.submit(function_name, v, i): (i, v) for i, v in enumerate(sub_dict.items(), 1)}

    #await results                       
    for future in concurrent.futures.as_completed(future_to_pair):

        pair = future_to_pair[future]          
        data = future.result()


#function that is being called by concurrent.futures
#am happy for all the v's in value to be on a single thread
#i want each key to be on an individual thread
#this will process 20 keys simultaneously, but wait for the slowest one before clearing

def function_name(sub_dict, i):

    for key, value in sub_dict:

        for v in value:

            # using subprocess, execute some stuff
            # dictionary loops provide parameters for the executables.
票数 0
EN

Stack Overflow用户

发布于 2020-05-14 09:25:38

我认为你错过了一个关键的概念:等待。没有等待的异步def函数是完全合法的,但却是没有意义的。异步编程的目的是处理这样的情况:您的程序必须等待某些东西,而程序在此期间可以做一些有用的事情。否则,它的实用性很小。*也不容易找到简单的例子来说明这是多么有用。

Python提供了几种类型的并发:进程,它使用多个CPU核心;线程,它在单个进程中使用多行执行;以及异步任务,它在线程中使用多个执行单元。它们可以以不同的方式组合在一起,并具有不同的特征。

线程允许您的程序在等待资源时在一个地方阻塞,但在另一个地方继续执行。但是线程之间的同步通常很棘手,因为在线程之间调度CPU时间是抢占式的。它不在你的直接控制之下。

任务也允许你的程序在一个地方停止,在另一个地方继续,但是在任务之间的切换是合作的。一切都在你的掌控之中。当一个任务遇到"await“表达式时,它会停在那里,并允许另一个任务运行。该任务将继续执行,直到遇到等待表达式,依此类推。如果这解决了你的问题,那就太好了。这是一个很棒的工具。

似乎,通过阅读大量的SO问题,程序员有时会得到这样的印象: asyncio会让他们的程序运行得更快,因为他们会把程序送到某种永远不会消耗CPU周期的地方,在那里执行,结果就会轻而易举地回来。抱歉,那不会发生的。主要的用例就是我所描述的:你必须等待一些事情,但你还有其他的事情要做。

*为了完整性添加了备注:我使用了asyncio的跨线程功能作为线程之间协调的手段。例如,在线程B中创建一个事件循环,并使其使用"call_soon_threadsafe“方法或"run_coroutine_threadsafe”方法根据线程A的需要执行函数。这是一个很方便的功能,即使它不需要使用await表达式。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61755419

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档