我试图使用“浮士德代理”中的接收器将一些数据从一个Kafka主题发送到另一个主题,并不断收到以下错误:
File "/.../venv/lib/python3.8/site-packages/schema_registry/serializers/message_serializer.py", line 49, in <lambda>
return lambda record, fp: schemaless_writer(fp, avro_schema.schema, record)
Crashed reason=TypeError('encoding without a string argument')我可以在调试中看到它发生在schemaless_writer函数中的某个地方,但在源代码中它的代码不清楚,所以我不确定那里到底发生了什么:https://github.com/fastavro/fastavro/blob/357543aebb94a4fd593b035f8501b20ac66d3b17/fastavro/_write.pyi
我的想法是,它试图对发送的数据中的每个字段进行“编码”,但它并不适用于每个字段,这不是一个字符串(我有一些数字、布尔值、日期时间和数据)。在发送每个字段之前,我试图将其转换为字符串,但由于它与我的模式有冲突,所以无法工作,例如:
Crashed reason=ValueError("'True' (type <class 'str'>) do not match ['null', 'boolean']") 也被绑在一起
await converted_payment_topic.send(value=result)但得到了同样的结果。
以下是我的当前代码:
app = faust.App(f'testConvertedPayments',
version=1,
key_serializer='raw',
value_serializer='raw',
broker=KAFKA_URL,
web_port=9090)
payment_topic = app.topic(payment_topic)
converted_payment_topic = app.topic(converted_payment_topic)
@app.agent(payment_topic, sink=[converted_payment_topic], concurrency=3)
async def payment_stream(payment):
async for part in payment.take(1000, within=30):
for x in part:
result = make_payment_row(x) #just some data manipulation and making a dict
result = {k: v for k, v in result.items() if v is not None}
yield result类似于我在文档中看到的所有示例,区别在于我的数据中不仅有字符串,还有解决这个问题的方法吗,而不将我的模式和所有数据类型更改为字符串?
谢谢!
发布于 2022-04-01 12:53:48
想清楚了,我需要发送一个faust.Record模型,而不是一个白痴。可以使用from_dict类的faust.Record方法从dict生成记录。
https://stackoverflow.com/questions/71617518
复制相似问题