我正在尝试将消息转发到faust的内部主题。正如faust在此示例中所建议的:
https://faust.readthedocs.io/en/latest/playbooks/vskafka.html
我有以下代码
in_topic = app.topic("in_topic", internal=False, partitions=1)
batching = app.topic("batching", internal=True, partitions=1)
....
@app.agent(in_topic)
async def process(stream):
async for event in stream:
event.forward(batching)
yield但在运行pytest时,我总是得到以下错误:
AttributeError: 'str' object has no attribute 'forward'这个特性是否被移除了,或者我需要以不同的方式指定主题才能获得事件,或者这甚至是pytest的一个问题?
发布于 2021-03-31 14:35:12
你在反向使用语法,这就是为什么!
in_topic = app.topic("in_topic", internal=False, partitions=1)
batching = app.topic("batching", internal=True, partitions=1)
....
@app.agent(in_topic)
async def process(stream):
async for event in stream:
batching.send(value=event)
yield应该能行得通。
编辑:
这也是我可以正确使用pytest和faust的唯一方法。
只有当您删除被模拟代理的最后一个接收器之外的所有接收器时,接收器方法才可用,这看起来并不直观。
如果您首先导入您的接收器,然后将此装饰器添加到您的测试中,那么一切都应该正常工作:
from my_path import my_sink, my_agent
from unittest.mock import patch, AsyncMock
@patch(__name__ + '.my_sink.send', new_callable=AsyncMock)
def test_my_agent(test_app)
async with my_agent.test_context() as agent:
payload = 'This_works_fine'
agent.put(payload)
my_sink.send.assert_called_with(value=payload)发布于 2021-03-04 03:58:46
试试下一个:
@app.agent(in_topic, sink=[batching])
async def process(stream):
async for event in stream:
yield event发布于 2021-04-01 15:56:42
来自@Florian Hall的答案工作得很好。我还发现forward方法只为事件实现,在我的例子中,我收到了一个str。原因可能是浮士德主题和渠道之间的差异。另一件事是,使用pytest和Faust test_context()的行为很奇怪,例如,即使您没有定义接收器,它也会迫使您放弃。
https://stackoverflow.com/questions/66157268
复制相似问题