首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >FAUST异步kafka消息处理并发不起作用

FAUST异步kafka消息处理并发不起作用
EN

Stack Overflow用户
提问于 2020-09-17 00:56:46
回答 1查看 394关注 0票数 0

目前,我正在尝试读取kafka主题中的数据,并使用从kafka主题中获取的数据异步调用rest-API。在这里,如果msg是Meher,rest-api会立即响应,否则响应将需要5秒

kafka-数据

代码语言:javascript
复制
Waldo
Meher
Waldo
Meher
Waldo
Meher
Waldo
Meher
Waldo
Meher

代码如下:

代码语言:javascript
复制
app = faust.App(
    'faustApp',
    broker="kafka://localhost:9092",
    value_serializer='raw',
)

app_topic = app.topic('topic_base')
@app.agent(app_topic,concurrency=1)
async def imports_news(articles):
    async for article in articles:
        val = article.decode('utf-8')
        url = 'http://0.0.0.0:5050/' + val
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
            print(data)
if __name__ == '__main__':
    app.main()

电流-输出:

代码语言:javascript
复制
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!
Hello Meher!
Hello Waldo!

预期-输出:

代码语言:javascript
复制
Hello Meher!
Hello Meher!
Hello Meher!
Hello Meher!
Hello Meher!
Hello Waldo!
Hello Waldo!
Hello Waldo!
Hello Waldo!
Hello Waldo!

期望的是获得所有rest调用的响应,首先是即时响应,然后是延迟响应,但目前它是按顺序工作的。

如果我将并发数增加到5,它会给出预期的输出,但在并发数为1的情况下应该也是如此。不确定,我是否错过了这方面的something....any帮助?

update1:

我已经用普通的python尝试了同样的方法,asyncIO..It的工作效果和预期的一样。

代码语言:javascript
复制
import asyncio
import aiofiles
import aiohttp

async def get_player(player_name):
    url = 'http://0.0.0.0:5050/' + player_name
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            data = await resp.text()
    print(data)


loop = asyncio.get_event_loop()
player_args = ["Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo",
               "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo",
               "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo", "Meher", "Waldo"]
loop.run_until_complete(
    asyncio.gather(
        *(get_player(args) for args in player_args)
    )
)
EN

回答 1

Stack Overflow用户

发布于 2021-04-05 00:54:43

从faust文档https://faust.readthedocs.io/en/latest/userguide/agents.html#id5可以看出,每个代理实例一次处理流的一个元素。流迭代不会在其可用元素上并行化,但单个代理实例将按顺序逐个处理流元素。

如果在处理流的元素时等待某些内容,则代理实例将不会移动到下一个元素(如果可用),直到该元素的处理完成。等待操作不会“解锁”代理,将其移动到下一个流元素,然后在第一个等待完成后恢复对第一个元素的处理。

另一方面,如果设置为concurrency=5,则有5个实例可以从流中获取项,并同时并行处理它们。

Asyncio.gather之所以工作,是因为协程被封装到任务中,并在一起并发运行,等待结果。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63924754

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档