首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka接收器连接器-> postgres,使用avro JSON数据失败

Kafka接收器连接器-> postgres,使用avro JSON数据失败
EN

Stack Overflow用户
提问于 2021-02-14 17:12:56
回答 1查看 509关注 0票数 0

我设置了一个Kafka接收器来将事件发送到PostgreSQL。我编写了这个简单的生成器,它向主题发送带有模式(avro)数据的JSON,如下所示:

producer.py (kafka-python)

代码语言:javascript
复制
biometrics = {
        "heartbeat": self.pulse, # integer
        "oxygen": self.oxygen,# integer
        "temprature": self.temprature, # float
        "time": time # string
    }

avro_value = {
               "schema": open(BASE_DIR+"/biometrics.avsc").read(),
               "payload": biometrics
             }

producer.send("biometrics",
                      key="some_string",
                      value=avro_value
                      )

值模式:

代码语言:javascript
复制
{
    "type": "record",
    "name": "biometrics",
    "namespace": "athlete",
    "doc": "athletes biometrics"
    "fields": [
        {
            "name": "heartbeat",
            "type": "int",
            "default": 0
        },
        {
            "name": "oxygen",
            "type": "int",
            "default": 0
        },
        {
            "name": "temprature",
            "type": "float",
            "default": 0.0
        },
        {
            "name": "time",
            "type": "string"
            "default": ""
        }
    ]
}

连接器配置(没有主机、密码等)

代码语言:javascript
复制
{
    "name": "jdbc_sink",
    "connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter ",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "topics": "biometrics",
    "insert.mode": "insert",
    "auto.create": "true"
}

但是我的连接器失败了,有三个错误,我找不到其中任何一个错误的原因:

TL;DR;日志版本

代码语言:javascript
复制
(Error 1) Caused by: org.apache.kafka.connect.errors.DataException: biometrics
(Error 2) Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
(Error 3) Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

全日志

代码语言:javascript
复制
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:498)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:475)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: biometrics
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:98)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:498)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

有人能帮我理解这些错误和潜在的原因吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-02-14 22:15:34

这个错误是因为您需要在连接器中使用JSONConverter类w/ value.converter.schemas.enabled=true,因为这是生成的,但是schema的有效负载是而不是,这是payload的一个Avro模式表示,所以它仍然可能仅通过这些更改失败.

如果要实际发送Avro,请在AvroProducer库中使用confluent-kafka,这需要运行架构注册表。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66197959

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档