首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >KSQL:在尝试从流中选择*时发生杰克逊错误

KSQL:在尝试从流中选择*时发生杰克逊错误
EN

Stack Overflow用户
提问于 2019-11-14 01:02:03
回答 1查看 151关注 0票数 0

这个问题基于KSQL stream from topic with heterogeneous JSON structures中的解决方案路径--当我尝试从一个主题创建的带有value_format='json'的流中获取一个错误时,我会得到一个错误。我仍然可以print底层的主题,但是执行select * from stream;会引发一个错误。

我们使用的是合流5.3.1,也就是Kafka 2.3.1。

这是我如何创造这个问题..。

  1. 从一些简单的json数据

开始。

代码语言:javascript
复制
ksql> PRINT 'sms-reporting.status-updated';
Format:JSON
{"ROWTIME":1573688864403,"ROWKEY":"null","acceptTs":"1573603201000","eventType":"REPORTING","isFinal":"true","messageId":"8619Z-1113H-00007-01CF5","nodeId":"86","responseCode":"3041","submissionAttempt":"1","ts":"1573603201567","type":"statusUpdated"}

  1. 创建流:

代码语言:javascript
复制
ksql> CREATE STREAM statusupdated ( \
 acceptTs VARCHAR, \
 eventType VARCHAR, \
 isFinal VARCHAR, \
 messageId VARCHAR, \
 nodeId VARCHAR, \
 responseCode VARCHAR, \
 submissionAttempt VARCHAR, \
 ts VARCHAR, \
 type VARCHAR \
) with (kafka_topic='sms-reporting.status-updated',value_format='json');

 Message
----------------
 Stream created
----------------
ksql> DESCRIBE statusupdated;

Name                 : STATUSUPDATED
 Field             | Type
-----------------------------------------------
 ROWTIME           | BIGINT           (system)
 ROWKEY            | VARCHAR(STRING)  (system)
 ACCEPTTS          | VARCHAR(STRING)
 EVENTTYPE         | VARCHAR(STRING)
 ISFINAL           | VARCHAR(STRING)
 MESSAGEID         | VARCHAR(STRING)
 NODEID            | VARCHAR(STRING)
 RESPONSECODE      | VARCHAR(STRING)
 SUBMISSIONATTEMPT | VARCHAR(STRING)
 TS                | VARCHAR(STRING)
 TYPE              | VARCHAR(STRING)
-----------------------------------------------

  1. 试图检查流的内容(失败):

代码语言:javascript
复制
ksql> SELECT * FROM statusupdated;
Query terminated
java.lang.RuntimeException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
 at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000","REPORTING","true","8619Z-1113H-00007-01CF5","86","3041","1","1573603201567","statusUpdated"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 206] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])
Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
 at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000","REPORTING","true","8619Z-1113H-00007-01CF5","86","3041","1","1573603201567","statusUpdated"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 206] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])
Caused by: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
 at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000","REPORTING","true","8619Z-1113H-00007-01CF5","86","3041","1","1573603201567","statusUpdated"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 206] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])

如果我现在使用这个流(在这个流上,我仍然不能在不接收错误的情况下执行select语句)来创建另一个流(当然,这个流将由一个新的主题支持),我仍然不能在新流上select,但是,我可以print新的支持主题,并且数据会像预期的那样出现。

  1. 创建派生流

代码语言:javascript
复制
ksql> CREATE STREAM statusupdated2 AS SELECT acceptts FROM statusupdated;

 Message
----------------------------
 Stream created and running
----------------------------
ksql> describe statusupdated2;

Name                 : STATUSUPDATED2
 Field    | Type
--------------------------------------
 ROWTIME  | BIGINT           (system)
 ROWKEY   | VARCHAR(STRING)  (system)
 ACCEPTTS | VARCHAR(STRING)
--------------------------------------

  1. 试图检查派生流的内容(未成功):

代码语言:javascript
复制
ksql> SELECT * FROM statusupdated2;
Query terminated
java.lang.RuntimeException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
 at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 113] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])
Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
 at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 113] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])
Caused by: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
 at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 113] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])

  1. print新流的支持主题:

代码语言:javascript
复制
ksql> PRINT statusupdated2;
Format:JSON
{"ROWTIME":1573692743435,"ROWKEY":"null","ACCEPTTS":"1573603201000"}
EN

回答 1

Stack Overflow用户

发布于 2019-11-14 11:31:46

你试过了吗

集‘auto.offset.Reset’=‘最早’;

在运行select查询之前?

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58847902

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档