首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用faust streamming访问rpc回复中的kafka标头

使用faust streamming访问rpc回复中的kafka标头
EN

Stack Overflow用户
提问于 2019-11-30 15:20:57
回答 2查看 292关注 0票数 0

是否可以访问faust rpc回复中的kafka报头?下面是两个faust代理的示例。一个( pow)调用另一个(mul)并以值的形式接收结果。但是如何知道回复主题中的kafka头呢?

代码语言:javascript
复制
#!/usr/bin/env python
from typing import AsyncIterable
import faust
from faust import StreamT

app = faust.App('RPC99', reply_create_topic=True)
pow_topic = app.topic('RPC__pow')
mul_topic = app.topic('RPC__mul')

@app.agent(pow_topic)
async def pow(stream: StreamT[float]) -> AsyncIterable[float]:
    async for value in stream:
        yield await mul.ask(value=value ** 2)
        # Headers for the returning result here?

@app.agent(mul_topic)
async def mul(stream: StreamT[float]) -> AsyncIterable[float]:
    async for value in stream:
        yield value * 100.0
EN

回答 2

Stack Overflow用户

发布于 2021-02-17 23:00:35

您可以从接收到的事件中获取头部:

stream.events()

代码语言:javascript
复制
@app.agent(topic)
async def iterrate(stream):
    async for event in stream.events():
        value = event.value
        offset = event.message.offset
        headers = event.headers

Faust.types.events的定义:

https://faust.readthedocs.io/en/latest/reference/faust.types.events.html#faust.types.events.EventT

票数 1
EN

Stack Overflow用户

发布于 2019-12-03 23:17:00

流事件似乎有一个Union[List[Tuple[str, bytes]]类型的headers属性。

docs在这里提供了它,但他们目前还没有关于这一点的详细信息。

看起来你可以做像这样的事情

代码语言:javascript
复制
@app.agent(topic)
async def process(stream):
    async for value in stream:
        do_something(value.headers)

如果您已经反序列化了消息,那么看起来您还可以通过faust.streams.current_event()使用streams api访问原始事件。

编辑以回应Роман的评论:

看起来您必须显式地发送消息才能修改消息头。在channel api中查看send()的函数签名。我认为send或该类中的send的其他变体应该是您正在寻找的。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59114100

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档