我很好奇你应该如何表达你想要在faust中传递一条消息给Kafka主题。他们自述文件中的示例似乎没有写入主题:
import faust
class Greeting(faust.Record):
from_name: str
to_name: str
app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)
@app.agent(topic)
async def hello(greetings):
async for greeting in greetings:
print(f'Hello from {greeting.from_name} to {greeting.to_name}')
@app.timer(interval=1.0)
async def example_sender(app):
await hello.send(
value=Greeting(from_name='Faust', to_name='you'),
)
if __name__ == '__main__':
app.main()我期望上面代码中的hello.send向主题发布一条消息,但它看起来并不是这样。
有很多阅读主题的例子,也有很多使用cli来推送即席消息的例子。在梳理文档之后,我没有看到任何在代码中发布到主题的明确示例。我是不是疯了,上面的代码应该可以工作?
发布于 2019-11-22 03:01:36
您可以使用sink告诉Faust将代理函数的结果传递到何处。如果你愿意,你也可以一次使用多个主题作为接收器。
@app.agent(topic_to_read_from, sink=[destination_topic])
async def fetch(records):
async for record in records:
result = do_something(record)
yield result发布于 2020-01-23 02:03:42
send()函数是用来写入主题的正确函数。您甚至可以指定一个特定的分区,就像等效的Java API调用一样。
下面是send()方法的引用:
https://faust.readthedocs.io/en/latest/reference/faust.topics.html#faust.topics.Topic.send
发布于 2020-12-04 10:13:56
如果你只想要一个Faust生产者(不结合消费者/接收器),原来的问题实际上有正确的代码,这里有一个功能齐全的脚本,它将消息发布到一个'faust_test‘Kafka主题,任何Kafka/Faust消费者都可以使用。
像这样运行下面的代码:python faust_producer.py worker
"""Simple Faust Producer"""
import faust
if __name__ == '__main__':
"""Simple Faust Producer"""
# Create the Faust App
app = faust.App('faust_test_app', broker='localhost:9092')
topic = app.topic('faust_test')
# Send messages
@app.timer(interval=1.0)
async def send_message(message):
await topic.send(value='my message')
# Start the Faust App
app.main()https://stackoverflow.com/questions/56782434
复制相似问题