首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用数据生成的ksql-datagen实用程序反序列化异常

用数据生成的ksql-datagen实用程序反序列化异常
EN

Stack Overflow用户
提问于 2019-01-04 05:04:14
回答 1查看 547关注 0票数 0

从ksql-datagen实用程序从以下模式生成的示例流-

代码语言:javascript
复制
{
        "type": "record",
        "name": "users",
        **"namespace": "com.example",**
        "fields": [
        {
        "name": "registertime",
        "type": {
            "type":"long",
            "arg.properties":{
                "range":{"min":1487715775521,"max":1519273364600}
                }
        }
        },
        {
                "name": "userid",
                "type": {
            "type":"string",
            "arg.properties":{"regex":"User_[1-9][0-2]"}
        }
        },
        {
                "name": "regionid",
                "type": {
            "type":"string",
            "arg.properties":{"regex":"Region_[1-9]"}
        }
        },
        {
                "name": "gender",
                "type": {
            "type":"string",
            "arg.properties":{
            "options":["MALE","FEMALE","OTHER"]
            }
        }
        }
]}

在检查版本时,它仍然选择"io.confluent.ksql.avro_schemas“模式-

curl "http://localhost:8081/subjects/test-user-value/versions/1"

{"subject":"test-user-value","version":1,"id":4,“schema”:“{”type“:”KsqlDataSourceSchema“,”KsqlDataSourceSchema“字段”:[{“名称”:“KsqlDataSourceSchema”,“KsqlDataSourceSchema”字段“”:[{“名称”:“寄存器”,“类型”:“空”,“长”,“默认”:null},{“名称”:“userid”,"type":"null",“字符串”、“默认”:空}、{“名称”:“区域化”、“类型”:“空”、“字符串”、“默认”:空}、{“名称”:“性别”、“类型”:“空”、“字符串”、“默认”:空}

在使用Kafka-streams API时遇到以下错误-

线程"PageView-Users-Stream-Join-eg-1dc610a3-c9d9-4c1e-b5eb-910e4bc74826-StreamThread-1“org.apache.kafka.streams.errors.StreamsException:反序列化异常处理程序中的异常设置为在反序列化错误时失败。如果希望流管道在反序列化错误后继续运行,请适当设置default.deserialization.exception.handler。在org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80) at org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160) at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101) at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:124) at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:995) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:833) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) by: org.apache.kafka.common.errors.SerializationException:错误反序列化id 4的Avro消息,原因是: org.apache.kafka.common.errors.SerializationException:找不到写入器模式中指定的类io.confluent.ksql.avro_schemas.KsqlDataSourceSchema,同时为SpecificRecord.查找读取器的模式

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-01-09 19:43:54

https://github.com/confluentinc/schema-registry/issues/980回答

Datagen总是将名称空间定义为io.confluent.ksql.avro_schemas。请参阅汇合点/ksql#1906

现在也有了其他方式来生成卡夫卡的测试数据。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54033351

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档