目前,我正在尝试读取kafka主题中的数据,并使用从kafka主题中获取的数据异步调用rest-API。在这里,如果msg是Meher,rest-api会立即响应,否则响应将需要5秒
kafka-数据
Waldo
Meher
Waldo
Meher
Waldo
Meher
Waldo
Meher
Waldo
Meher代码如下:
app = faust.App(
'faustApp',
broker="kafka://localhost:9092",
value_serializer='raw',
)
app_topic = app.topic('topic_base')
@app.agent(app_topic,concurrency=1)
async def imports_news(articles):
async for article in articles:
val = article.decode('utf-8')
url = 'http://0.0.0.0:5050/' + val
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(data)
if __name__ == '__main__':
app.main()电流-输出:
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!预期-输出:
Hello Meher!
Hello Meher!
Hello Meher!
Hello Meher!
Hello Meher!
Hello Waldo!
Hello Waldo!
Hello Waldo!
Hello Waldo!
Hello Waldo!期望的是获得所有rest调用的响应,首先是即时响应,然后是延迟响应,但目前它是按顺序工作的。
如果我将并发数增加到5,它会给出预期的输出,但在并发数为1的情况下应该也是如此。不确定,我是否错过了这方面的something....any帮助?
update1:
我已经用普通的python尝试了同样的方法,asyncIO..It的工作效果和预期的一样。
import asyncio
import aiofiles
import aiohttp
async def get_player(player_name):
url = 'http://0.0.0.0:5050/' + player_name
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.text()
print(data)
loop = asyncio.get_event_loop()
player_args = ["Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo",
"Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo",
"Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo"]
loop.run_until_complete(
asyncio.gather(
*(get_player(args) for args in player_args)
)
)发布于 2021-04-05 00:54:43
从faust文档https://faust.readthedocs.io/en/latest/userguide/agents.html#id5可以看出,每个代理实例一次处理流的一个元素。流迭代不会在其可用元素上并行化,但单个代理实例将按顺序逐个处理流元素。
如果在处理流的元素时等待某些内容,则代理实例将不会移动到下一个元素(如果可用),直到该元素的处理完成。等待操作不会“解锁”代理,将其移动到下一个流元素,然后在第一个等待完成后恢复对第一个元素的处理。
另一方面,如果设置为concurrency=5,则有5个实例可以从流中获取项,并同时并行处理它们。
Asyncio.gather之所以工作,是因为协程被封装到任务中,并在一起并发运行,等待结果。
https://stackoverflow.com/questions/63924754
复制相似问题