首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将数据发送到接收器的浮士德代理中的“无字符串参数编码”错误?

将数据发送到接收器的浮士德代理中的“无字符串参数编码”错误?
EN

Stack Overflow用户
提问于 2022-03-25 13:11:30
回答 1查看 147关注 0票数 0

我试图使用“浮士德代理”中的接收器将一些数据从一个Kafka主题发送到另一个主题,并不断收到以下错误:

代码语言:javascript
复制
File "/.../venv/lib/python3.8/site-packages/schema_registry/serializers/message_serializer.py", line 49, in <lambda>
    return lambda record, fp: schemaless_writer(fp, avro_schema.schema, record)
Crashed reason=TypeError('encoding without a string argument')

我可以在调试中看到它发生在schemaless_writer函数中的某个地方,但在源代码中它的代码不清楚,所以我不确定那里到底发生了什么:https://github.com/fastavro/fastavro/blob/357543aebb94a4fd593b035f8501b20ac66d3b17/fastavro/_write.pyi

我的想法是,它试图对发送的数据中的每个字段进行“编码”,但它并不适用于每个字段,这不是一个字符串(我有一些数字、布尔值、日期时间和数据)。在发送每个字段之前,我试图将其转换为字符串,但由于它与我的模式有冲突,所以无法工作,例如:

代码语言:javascript
复制
Crashed reason=ValueError("'True' (type <class 'str'>) do not match ['null', 'boolean']") 

也被绑在一起

代码语言:javascript
复制
await converted_payment_topic.send(value=result)

但得到了同样的结果。

以下是我的当前代码:

代码语言:javascript
复制
app = faust.App(f'testConvertedPayments',
          version=1,
          key_serializer='raw',
          value_serializer='raw',
          broker=KAFKA_URL,
          web_port=9090)

payment_topic = app.topic(payment_topic)
converted_payment_topic = app.topic(converted_payment_topic)

@app.agent(payment_topic, sink=[converted_payment_topic], concurrency=3)
async def payment_stream(payment):
  async for part in payment.take(1000, within=30):
    for x in part:
      result = make_payment_row(x) #just some data manipulation and making a dict
      result = {k: v for k, v in result.items() if v is not None}
      yield result

类似于我在文档中看到的所有示例,区别在于我的数据中不仅有字符串,还有解决这个问题的方法吗,而不将我的模式和所有数据类型更改为字符串?

谢谢!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-04-01 12:53:48

想清楚了,我需要发送一个faust.Record模型,而不是一个白痴。可以使用from_dict类的faust.Record方法从dict生成记录。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71617518

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档