我有个简单的浮士德特工。消费kafka topic中的json,默认faust序列化器解析为dicts:
@app.agent(source_topic, sink=[destination_topic])
async def fetch(records):
async for record in records:
result = do_some_stuff(record)
yield result反序列化本身发生在我的代码之外的某个地方,它由faust框架管理,而不是我。我如何捕获和处理反序列化异常,例如在无效的json的情况下?
发布于 2021-05-03 21:11:41
您可以手动序列化数据并捕获错误:
topic = app.topic('custom', value_type=bytes)
@app.agent
async def processor(stream):
async for payload in stream:
data = json.loads(payload)来源:https://faust.readthedocs.io/en/latest/userguide/models.html
https://stackoverflow.com/questions/67334118
复制相似问题