我们正在升级到汇流平台5.5.2,升级之后,我们的Kafka JDBC Sink连接器(并非全部)开始出现以下错误日志的问题
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:495)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:472)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value\n\tat org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)\n\tat io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1812)\n\tat io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1567)\n\tat io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1687)\n\tat io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1543)\n\tat io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1226)\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:108)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:495)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\nCaused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: \"null\", schema type: STRING\n\tat org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)\n\tat org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)\n\tat org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)\n\t... 23 more\n
有谁有同样的问题并且已经解决了吗?
我们还没有将Schema升级到较新的版本(它仍在使用5.0.0版本),升级模式注册表可能有助于解决这个问题吗?
更新:
固定:
,
type,不包括"null"值。在将"null"值添加到type后,问题已经解决。发布于 2020-12-09 14:07:22
您的错误可能是default字段中某个主题的avro架构中的不一致。检查字段的"default": null不允许null时,不要设置null。
例如:
坏字段:
{
"default": null,
"name": "field_name",
"type": "string"
}GOOD字段:
{
"name": "field_name",
"type": "string"
}或者,GOOD字段:
{
"default": null,
"name": "field_name",
"type": [
"null",
"string"
]
}https://stackoverflow.com/questions/64716265
复制相似问题