我在从Kinesis流中读取Debezium更改日志时遇到了一些问题。我可以了解一下如何使用Flink SQL解析更改日志事件吗?
下面是我尝试通过Flink SQL客户端解析该流的尝试
Flink SQL> CREATE TABLE test_table (
> city_id INT,
> country_id INT,
> city STRING,
> last_update timestamp
> )
> WITH (
> 'connector' = 'kinesis',
> 'stream' = 'kinesis.sakila.city',
> 'aws.region' = 'us-east-1',
> 'scan.stream.initpos' = 'TRIM_HORIZON',
> 'format' = 'debezium-json'
> );
[INFO] Table has been created.
Flink SQL> select * from test_table;
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Kinesis consumer does not support DeserializationSchema that implements deserialization with a Collector. Unsupported DeserializationSchema: org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema发布于 2021-02-23 04:47:34
Flink文档中有一个表格,其中显示了which connectors support each of the formats。在那里,您将看到Kinesis连接器不支持Debezium changelog格式。
https://stackoverflow.com/questions/66322894
复制相似问题