我正在试图收回股票价格,并处理价格,因为他们来了。我是并发的初学者,但我认为这种设置似乎适合异步生产者-消费者模型,在这种模型中,每个生产者检索一个股票价格,并将其传递给消费者。现在,由于工作是CPU密集型的,消费者已经并行(多处理)进行了股票价格处理。因此,我已经有多个使用者在工作,而不是所有的生产者都完成了数据的检索。此外,我还想实施一个步骤,如果消费者发现它正在进行的股票价格是无效的,我们将为该股票生成一个新的消费者工作。
到目前为止,我有下面的玩具代码,它可以让我达到目的,但是我的process_data函数(消费者)有问题。
from concurrent.futures import ProcessPoolExecutor
import asyncio
import random
import time
random.seed(444)
#producers
async def retrieve_data(ticker, q):
'''
Pretend we're using aiohttp to retrieve stock prices from a URL
Place a tuple of stock ticker and price into asyn queue as it becomes available
'''
start = time.perf_counter() # start timer
await asyncio.sleep(random.randint(4, 8)) # pretend we're calling some URL
price = random.randint(1, 100) # pretend this is the price we retrieved
print(f'{ticker} : {price} retrieved in {time.perf_counter() - start:0.1f} seconds')
await q.put((ticker, price)) # place the price into the asyncio queue
#consumers
async def process_data(q):
while True:
data = await q.get()
print(f"processing: {data}")
with ProcessPoolExecutor() as executor:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor, data_processor, data)
#if output of data_processing failed, send ticker back to queue to retrieve data again
if not result[2]:
print(f'{result[0]} data invalid. Retrieving again...')
await retrieve_data(result[0], q) # add a new task
q.task_done() # end this task
else:
q.task_done() # so that q.join() knows when the task is done
async def main(tickers):
q = asyncio.Queue()
producers = [asyncio.create_task(retrieve_data(ticker, q)) for ticker in tickers]
consumers = [asyncio.create_task(process_data(q))]
await asyncio.gather(*producers)
await q.join() # Implicitly awaits consumers, too. blocks until all items in the queue have been received and processed
for c in consumers:
c.cancel() #cancel the consumer tasks, which would otherwise hang up and wait endlessly for additional queue items to appear
'''
RUN IN JUPYTER NOTEBOOK
'''
start = time.perf_counter()
tickers = ['AAPL', 'AMZN', 'TSLA', 'C', 'F']
await main(tickers)
print(f'total elapsed time: {time.perf_counter() - start:0.2f}')
'''
RUN IN TERMINAL
'''
# if __name__ == "__main__":
# start = time.perf_counter()
# tickers = ['AAPL', 'AMZN', 'TSLA', 'C', 'F']
# asyncio.run(main(tickers))
# print(f'total elapsed time: {time.perf_counter() - start:0.2f}')下面的data_processor()函数,由上面的process_data()调用,需要位于木星笔记本中的一个不同的单元中,或者一个单独的模块(据我所理解,以避免使用PicklingError)。
from multiprocessing import current_process
def data_processor(data):
ticker = data[0]
price = data[1]
print(f'Started {ticker} - {current_process().name}')
start = time.perf_counter() # start time counter
time.sleep(random.randint(4, 5)) # mimic some random processing time
# pretend we're processing the price. Let the processing outcome be invalid if the price is an odd number
if price % 2==0:
is_valid = True
else:
is_valid = False
print(f"{ticker}'s price {price} validity: --{is_valid}--"
f' Elapsed time: {time.perf_counter() - start:0.2f} seconds')
return (ticker, price, is_valid)THE ISSUES
result)和能够并行运行多个子进程之间做出选择。使用下面的构造,子进程按顺序运行,而不是并行运行。以ProcessPoolExecutor()作为执行器:循环= asyncio.get_running_loop()结果=等待loop.run_in_executor(executor,data_processor,data)
在result = await前面删除loop.run_in_executor(executor, data_processor, data)允许并行运行多个使用者,但是我不能从父进程收集他们的结果。我需要await。当然,剩下的代码块也会失败。
如何使这些子进程并行运行并提供输出?也许它需要一种不同于生产者-消费者模型的结构或其他东西。
#如果data_processing的输出失败,请将滴答器发送回队列,以便再次检索数据(如果不是result2: print(f'{result} data )无效。再次检索.‘)等待retrieve_data(结果,q) #添加一个新任务q.task_done() #结束此任务: q.task_done() #,以便q.join()知道任务何时完成
发布于 2021-01-26 21:00:11
,但似乎我必须在检索执行器调用的函数的输出(结果)和能够并行运行几个子进程之间做出选择。
幸运的是,情况并非如此,您还可以使用asyncio.gather()一次等待多个项目。但是您从队列中逐个获得数据项,因此您没有一批要处理的项。最简单的解决方案就是启动多个消费者。替换
# the single-element list looks suspicious anyway
consumers = [asyncio.create_task(process_data(q))]通过以下方式:
# now we have an actual list
consumers = [asyncio.create_task(process_data(q)) for _ in range(16)]每个使用者都会等待一个单独的任务完成,但这是可以的,因为您将有一个完整的池并行工作,这正是您想要的。
另外,您可能希望使executor成为一个全局变量,而不是使用with,这样流程池就可以被所有使用者共享,并且与程序一样持久。这样,消费者将重用已经生成的工作进程,而不必为从队列接收的每个作业生成一个新的进程。(这就是拥有一个流程“池”的全部意义。)在这种情况下,您可能希望在程序中不再需要执行器的时候添加executor.shutdown()。
https://stackoverflow.com/questions/65905372
复制相似问题