这个问题基于KSQL stream from topic with heterogeneous JSON structures中的解决方案路径--当我尝试从一个主题创建的带有value_format='json'的流中获取一个错误时,我会得到一个错误。我仍然可以print底层的主题,但是执行select * from stream;会引发一个错误。
我们使用的是合流5.3.1,也就是Kafka 2.3.1。
这是我如何创造这个问题..。
开始。
ksql> PRINT 'sms-reporting.status-updated';
Format:JSON
{"ROWTIME":1573688864403,"ROWKEY":"null","acceptTs":"1573603201000","eventType":"REPORTING","isFinal":"true","messageId":"8619Z-1113H-00007-01CF5","nodeId":"86","responseCode":"3041","submissionAttempt":"1","ts":"1573603201567","type":"statusUpdated"}ksql> CREATE STREAM statusupdated ( \
acceptTs VARCHAR, \
eventType VARCHAR, \
isFinal VARCHAR, \
messageId VARCHAR, \
nodeId VARCHAR, \
responseCode VARCHAR, \
submissionAttempt VARCHAR, \
ts VARCHAR, \
type VARCHAR \
) with (kafka_topic='sms-reporting.status-updated',value_format='json');
Message
----------------
Stream created
----------------
ksql> DESCRIBE statusupdated;
Name : STATUSUPDATED
Field | Type
-----------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ACCEPTTS | VARCHAR(STRING)
EVENTTYPE | VARCHAR(STRING)
ISFINAL | VARCHAR(STRING)
MESSAGEID | VARCHAR(STRING)
NODEID | VARCHAR(STRING)
RESPONSECODE | VARCHAR(STRING)
SUBMISSIONATTEMPT | VARCHAR(STRING)
TS | VARCHAR(STRING)
TYPE | VARCHAR(STRING)
-----------------------------------------------ksql> SELECT * FROM statusupdated;
Query terminated
java.lang.RuntimeException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000","REPORTING","true","8619Z-1113H-00007-01CF5","86","3041","1","1573603201567","statusUpdated"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 206] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])
Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000","REPORTING","true","8619Z-1113H-00007-01CF5","86","3041","1","1573603201567","statusUpdated"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 206] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])
Caused by: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000","REPORTING","true","8619Z-1113H-00007-01CF5","86","3041","1","1573603201567","statusUpdated"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 206] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])如果我现在使用这个流(在这个流上,我仍然不能在不接收错误的情况下执行select语句)来创建另一个流(当然,这个流将由一个新的主题支持),我仍然不能在新流上select,,但是,我可以print新的支持主题,并且数据会像预期的那样出现。
ksql> CREATE STREAM statusupdated2 AS SELECT acceptts FROM statusupdated;
Message
----------------------------
Stream created and running
----------------------------
ksql> describe statusupdated2;
Name : STATUSUPDATED2
Field | Type
--------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ACCEPTTS | VARCHAR(STRING)
--------------------------------------ksql> SELECT * FROM statusupdated2;
Query terminated
java.lang.RuntimeException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 113] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])
Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 113] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])
Caused by: Unrecognized field "terminal" (class io.confluent.ksql.rest.entity.StreamedRow), not marked as ignorable (3 known properties: "finalMessage", "row", "errorMessage"])
at [Source: (String)"{"row":{"columns":[1573692743435,null,"1573603201000"]},"errorMessage":null,"finalMessage":null,"terminal":false}"; line: 1, column: 113] (through reference chain: io.confluent.ksql.rest.entity.StreamedRow["terminal"])print新流的支持主题:ksql> PRINT statusupdated2;
Format:JSON
{"ROWTIME":1573692743435,"ROWKEY":"null","ACCEPTTS":"1573603201000"}发布于 2019-11-14 11:31:46
你试过了吗
集‘auto.offset.Reset’=‘最早’;
在运行select查询之前?
https://stackoverflow.com/questions/58847902
复制相似问题