是否可以访问faust rpc回复中的kafka报头?下面是两个faust代理的示例。一个( pow)调用另一个(mul)并以值的形式接收结果。但是如何知道回复主题中的kafka头呢?
#!/usr/bin/env python
from typing import AsyncIterable
import faust
from faust import StreamT
app = faust.App('RPC99', reply_create_topic=True)
pow_topic = app.topic('RPC__pow')
mul_topic = app.topic('RPC__mul')
@app.agent(pow_topic)
async def pow(stream: StreamT[float]) -> AsyncIterable[float]:
async for value in stream:
yield await mul.ask(value=value ** 2)
# Headers for the returning result here?
@app.agent(mul_topic)
async def mul(stream: StreamT[float]) -> AsyncIterable[float]:
async for value in stream:
yield value * 100.0发布于 2021-02-17 23:00:35
您可以从接收到的事件中获取头部:
stream.events()
@app.agent(topic)
async def iterrate(stream):
async for event in stream.events():
value = event.value
offset = event.message.offset
headers = event.headersFaust.types.events的定义:
https://faust.readthedocs.io/en/latest/reference/faust.types.events.html#faust.types.events.EventT
发布于 2019-12-03 23:17:00
流事件似乎有一个Union[List[Tuple[str, bytes]]类型的headers属性。
docs在这里提供了它,但他们目前还没有关于这一点的详细信息。
看起来你可以做像这样的事情
@app.agent(topic)
async def process(stream):
async for value in stream:
do_something(value.headers)如果您已经反序列化了消息,那么看起来您还可以通过faust.streams.current_event()使用streams api访问原始事件。
编辑以回应Роман的评论:
看起来您必须显式地发送消息才能修改消息头。在channel api中查看send()的函数签名。我认为send或该类中的send的其他变体应该是您正在寻找的。
https://stackoverflow.com/questions/59114100
复制相似问题