首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >发布到kafka主题的Faust示例

发布到kafka主题的Faust示例
EN

Stack Overflow用户
提问于 2019-06-27 08:19:36
回答 5查看 5.2K关注 0票数 11

我很好奇你应该如何表达你想要在faust中传递一条消息给Kafka主题。他们自述文件中的示例似乎没有写入主题:

代码语言:javascript
复制
import faust

class Greeting(faust.Record):
    from_name: str
    to_name: str

app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)

@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')

@app.timer(interval=1.0)
async def example_sender(app):
    await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )

if __name__ == '__main__':
    app.main()

我期望上面代码中的hello.send向主题发布一条消息,但它看起来并不是这样。

有很多阅读主题的例子,也有很多使用cli来推送即席消息的例子。在梳理文档之后,我没有看到任何在代码中发布到主题的明确示例。我是不是疯了,上面的代码应该可以工作?

EN

回答 5

Stack Overflow用户

发布于 2019-11-22 03:01:36

您可以使用sink告诉Faust将代理函数的结果传递到何处。如果你愿意,你也可以一次使用多个主题作为接收器。

代码语言:javascript
复制
@app.agent(topic_to_read_from, sink=[destination_topic])
async def fetch(records):
    async for record in records:
        result = do_something(record)
        yield result
票数 5
EN

Stack Overflow用户

发布于 2020-01-23 02:03:42

send()函数是用来写入主题的正确函数。您甚至可以指定一个特定的分区,就像等效的Java API调用一样。

下面是send()方法的引用:

https://faust.readthedocs.io/en/latest/reference/faust.topics.html#faust.topics.Topic.send

票数 5
EN

Stack Overflow用户

发布于 2020-12-04 10:13:56

如果你只想要一个Faust生产者(不结合消费者/接收器),原来的问题实际上有正确的代码,这里有一个功能齐全的脚本,它将消息发布到一个'faust_test‘Kafka主题,任何Kafka/Faust消费者都可以使用。

像这样运行下面的代码:python faust_producer.py worker

代码语言:javascript
复制
"""Simple Faust Producer"""
import faust

if __name__ == '__main__':
    """Simple Faust Producer"""

    # Create the Faust App
    app = faust.App('faust_test_app', broker='localhost:9092')
    topic = app.topic('faust_test')

    # Send messages
    @app.timer(interval=1.0)
    async def send_message(message):
        await topic.send(value='my message')

    # Start the Faust App
    app.main()
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56782434

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档