首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >多处理的Asyncio :生产者-消费者模型

多处理的Asyncio :生产者-消费者模型
EN

Stack Overflow用户
提问于 2021-01-26 16:37:16
回答 1查看 736关注 0票数 1

我正在试图收回股票价格,并处理价格,因为他们来了。我是并发的初学者,但我认为这种设置似乎适合异步生产者-消费者模型,在这种模型中,每个生产者检索一个股票价格,并将其传递给消费者。现在,由于工作是CPU密集型的,消费者已经并行(多处理)进行了股票价格处理。因此,我已经有多个使用者在工作,而不是所有的生产者都完成了数据的检索。此外,我还想实施一个步骤,如果消费者发现它正在进行的股票价格是无效的,我们将为该股票生成一个新的消费者工作。

到目前为止,我有下面的玩具代码,它可以让我达到目的,但是我的process_data函数(消费者)有问题。

代码语言:javascript
复制
from concurrent.futures import ProcessPoolExecutor
import asyncio
import random
import time
random.seed(444)

#producers
async def retrieve_data(ticker, q):
    '''
    Pretend we're using aiohttp to retrieve stock prices from a URL
    Place a tuple of stock ticker and price into asyn queue as it becomes available
    '''
    start = time.perf_counter() # start timer
    await asyncio.sleep(random.randint(4, 8)) # pretend we're calling some URL
    price = random.randint(1, 100) # pretend this is the price we retrieved
    print(f'{ticker} : {price} retrieved in {time.perf_counter() - start:0.1f} seconds') 
    await q.put((ticker, price)) # place the price into the asyncio queue
    

#consumers
async def process_data(q):
    while True:
        data = await q.get()
        print(f"processing: {data}")
        with ProcessPoolExecutor() as executor:
            loop = asyncio.get_running_loop()
            result = await loop.run_in_executor(executor, data_processor, data)
            #if output of data_processing failed, send ticker back to queue to retrieve data again
            if not result[2]: 
                print(f'{result[0]} data invalid. Retrieving again...')
                await retrieve_data(result[0], q) # add a new task
                q.task_done() # end this task
            else:
                q.task_done() # so that q.join() knows when the task is done
            
async def main(tickers):       
    q = asyncio.Queue()
    producers = [asyncio.create_task(retrieve_data(ticker, q)) for ticker in tickers]
    consumers = [asyncio.create_task(process_data(q))]
    await asyncio.gather(*producers)
    await q.join()  # Implicitly awaits consumers, too. blocks until all items in the queue have been received and processed
    for c in consumers:
        c.cancel() #cancel the consumer tasks, which would otherwise hang up and wait endlessly for additional queue items to appear
    

    
'''
RUN IN JUPYTER NOTEBOOK
'''
start = time.perf_counter()
tickers = ['AAPL', 'AMZN', 'TSLA', 'C', 'F']
await main(tickers)
print(f'total elapsed time: {time.perf_counter() - start:0.2f}')

'''
RUN IN TERMINAL
'''
# if __name__ == "__main__":
#     start = time.perf_counter()
#     tickers = ['AAPL', 'AMZN', 'TSLA', 'C', 'F']
#     asyncio.run(main(tickers))
#     print(f'total elapsed time: {time.perf_counter() - start:0.2f}')

下面的data_processor()函数,由上面的process_data()调用,需要位于木星笔记本中的一个不同的单元中,或者一个单独的模块(据我所理解,以避免使用PicklingError)。

代码语言:javascript
复制
from multiprocessing import current_process

def data_processor(data):
    ticker = data[0]
    price = data[1]
    
    print(f'Started {ticker} - {current_process().name}')
    start = time.perf_counter() # start time counter
    time.sleep(random.randint(4, 5)) # mimic some random processing time
    
    # pretend we're processing the price. Let the processing outcome be invalid if the price is an odd number
    if price % 2==0:
        is_valid = True
    else:
        is_valid = False
    
    print(f"{ticker}'s price {price} validity: --{is_valid}--"
          f' Elapsed time: {time.perf_counter() - start:0.2f} seconds')
    return (ticker, price, is_valid)

THE ISSUES

  1. 没有使用python的多处理模块,而是使用了concurrent.futures‘ProcessPoolExecutor,它与异步(What kind of problems (if any) would there be combining asyncio with multiprocessing?)兼容。但是,我似乎必须在检索执行器调用的函数的输出(result)和能够并行运行多个子进程之间做出选择。使用下面的构造,子进程按顺序运行,而不是并行运行。

以ProcessPoolExecutor()作为执行器:循环= asyncio.get_running_loop()结果=等待loop.run_in_executor(executor,data_processor,data)

result = await前面删除loop.run_in_executor(executor, data_processor, data)允许并行运行多个使用者,但是我不能从父进程收集他们的结果。我需要await。当然,剩下的代码块也会失败。

如何使这些子进程并行运行并提供输出?也许它需要一种不同于生产者-消费者模型的结构或其他东西。

  1. 代码中要求再次检索无效股票价格的部分工作正常(前提是我可以从上面得到结果),但是它在调用它的子流程中运行,并阻止新的消费者被创建,直到请求得到满足。有办法解决这个问题吗?

#如果data_processing的输出失败,请将滴答器发送回队列,以便再次检索数据(如果不是result2: print(f'{result} data )无效。再次检索.‘)等待retrieve_data(结果,q) #添加一个新任务q.task_done() #结束此任务: q.task_done() #,以便q.join()知道任务何时完成

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-01-26 21:00:11

,但似乎我必须在检索执行器调用的函数的输出(结果)和能够并行运行几个子进程之间做出选择。

幸运的是,情况并非如此,您还可以使用asyncio.gather()一次等待多个项目。但是您从队列中逐个获得数据项,因此您没有一批要处理的项。最简单的解决方案就是启动多个消费者。替换

代码语言:javascript
复制
# the single-element list looks suspicious anyway
consumers = [asyncio.create_task(process_data(q))]

通过以下方式:

代码语言:javascript
复制
# now we have an actual list
consumers = [asyncio.create_task(process_data(q)) for _ in range(16)]

每个使用者都会等待一个单独的任务完成,但这是可以的,因为您将有一个完整的池并行工作,这正是您想要的。

另外,您可能希望使executor成为一个全局变量,而不是使用with,这样流程池就可以被所有使用者共享,并且与程序一样持久。这样,消费者将重用已经生成的工作进程,而不必为从队列接收的每个作业生成一个新的进程。(这就是拥有一个流程“池”的全部意义。)在这种情况下,您可能希望在程序中不再需要执行器的时候添加executor.shutdown()

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65905372

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档