首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在Flink中可以在哪里设置transaction.timeout.ms

在Flink中可以在哪里设置transaction.timeout.ms
EN

Stack Overflow用户
提问于 2022-07-01 13:01:20
回答 1查看 195关注 0票数 0

我使用FlinkKafkaConsumer从Kafka读取数据,然后将数据转换为表,最后用FlinkSQL将接收器数据返回到kafka(kafka-连接器表)。为了得到准确的一次交付保证,我用属性设置了kafka表:sink.emantic=精确-一次。当进行测试时,我得到了错误“事务超时大于代理允许的最大值”。

Flink默认Kafka生产者最大事务超时: 1h

卡夫卡默认设置为transaction.max.timeout.ms=900000。

所以,我需要在卡夫卡制片人中添加"transaction.timeout.ms“属性。我的问题是在哪里可以使用FlinkSQL添加这个属性。

我的代码:

代码语言:javascript
复制
tableEnv.executeSql("INSERT INTO sink_kafka_table select * from source_table")

我知道如何使用表api。

代码语言:javascript
复制
tableEnv.connect(new Kafka()
          .version("") 
          .topic("")
          .property("bootstrap.server","")
          .property("transaction.timeout.ms","120000"))
          .withSchema()
          .withFormat()
          .createTemporaryTable("sink_table")
 table.executeInsert("sink_table")

修改kafka配置文件不是很好的建议。任何建议都会有帮助的,谢谢提前。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-07-01 13:21:15

使用连接器声明https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/common/#connector-tables,您可以使用.option方法来设置properties.* 选项,该方法将被转发到kafka客户端,并去掉properties.。所以你需要设置properties.transaction.max.timeout.ms

您还可以使用SQL语句创建sink_table,并使用properties.*选项传递任何配置:https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/#properties

我不太熟悉如何创建表,但我认为它在1.14中被废弃并删除了:https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/api/TableEnvironment.html#connect-org.apache.flink.table.descriptors.ConnectorDescriptor-方法注释建议创建执行SQL语句的表。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72829666

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档