我使用FlinkKafkaConsumer从Kafka读取数据,然后将数据转换为表,最后用FlinkSQL将接收器数据返回到kafka(kafka-连接器表)。为了得到准确的一次交付保证,我用属性设置了kafka表:sink.emantic=精确-一次。当进行测试时,我得到了错误“事务超时大于代理允许的最大值”。

Flink默认Kafka生产者最大事务超时: 1h

卡夫卡默认设置为transaction.max.timeout.ms=900000。
所以,我需要在卡夫卡制片人中添加"transaction.timeout.ms“属性。我的问题是在哪里可以使用FlinkSQL添加这个属性。
我的代码:
tableEnv.executeSql("INSERT INTO sink_kafka_table select * from source_table")我知道如何使用表api。
tableEnv.connect(new Kafka()
.version("")
.topic("")
.property("bootstrap.server","")
.property("transaction.timeout.ms","120000"))
.withSchema()
.withFormat()
.createTemporaryTable("sink_table")
table.executeInsert("sink_table")修改kafka配置文件不是很好的建议。任何建议都会有帮助的,谢谢提前。
发布于 2022-07-01 13:21:15
使用连接器声明https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/common/#connector-tables,您可以使用.option方法来设置properties.* 选项,该方法将被转发到kafka客户端,并去掉properties.。所以你需要设置properties.transaction.max.timeout.ms
您还可以使用SQL语句创建sink_table,并使用properties.*选项传递任何配置:https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#properties
我不太熟悉如何创建表,但我认为它在1.14中被废弃并删除了:https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/api/TableEnvironment.html#connect-org.apache.flink.table.descriptors.ConnectorDescriptor-方法注释建议创建执行SQL语句的表。
https://stackoverflow.com/questions/72829666
复制相似问题