首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >flink:等待流水线确认数据时中断

flink:等待流水线确认数据时中断
EN

Stack Overflow用户
提问于 2021-03-26 20:09:22
回答 1查看 327关注 0票数 1

我正在做一个flink CDC + iceberg的POC。我按照这个debezium教程将cdc发送到kafka - https://debezium.io/documentation/reference/1.4/tutorial.html。我的flink作业工作得很好,可以将数据写入hive表以进行插入。但是,当我向mysql表发起更新/删除查询时,我开始在flink作业中收到这个错误。我还附加了retract stream的输出

更新查询- UPDATE customers SET first_name='Anne Marie' WHERE id=1004;

代码语言:javascript
复制
1> (true,1001,Sally,Thomas,sally.thomas@acme.com)
1> (true,1002,George,Bailey,gbailey@foobar.com)
1> (true,1003,Edward,Walker,ed@walker.com)
1> (true,1004,Anne,Kretchmar,annek@noanswer.org)
1> (true,1005,Sarah,Thompson,kitt@acme.com)
1> (false,1004,Anne,Kretchmar,annek@noanswer.org)
1> (true,1004,Anne Marie,Kretchmar,annek@noanswer.org)

错误堆栈跟踪

代码语言:javascript
复制
15:27:42.163 [Source: TableSourceScan(table=[[default_catalog, default_database, topic_customers]], fields=[id, first_name, last_name, email]) -> SinkConversionToTuple2 -> (Map -> Map -> IcebergStreamWriter, Sink: Print to Std. Out) (3/4)] ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal of stream operator.
java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline
    at org.apache.hadoop.hdfs.DataStreamer.waitForAckedSeqno(DataStreamer.java:886) ~[hadoop-hdfs-client-2.10.1.jar:?]
    at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:749) ~[hadoop-hdfs-client-2.10.1.jar:?]
    at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:859) ~[hadoop-hdfs-client-2.10.1.jar:?]
    at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:818) ~[hadoop-hdfs-client-2.10.1.jar:?]
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) ~[hadoop-common-2.10.1.jar:?]
    at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) ~[hadoop-common-2.10.1.jar:?]
    at org.apache.iceberg.shaded.org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64) ~[iceberg-flink-runtime-0.11.0.jar:?]

这是我的代码,topic_customers是Kafka动态表,它监听cdc事件

代码语言:javascript
复制
Table out = tEnv.sqlQuery("select * from topic_customers"); 
DataStream<Tuple2<Boolean, Row>> dsRow = tEnv.toRetractStream(out, Row.class);
DataStream<Row> dsRow2 = dsRow.map((MapFunction<Tuple2<Boolean, Row>, Row>) x -> x.f1);
TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader, tableIdentifier);
FlinkSink.forRow(dsRow2,TableSchema.builder()
        .field("id", DataTypes.BIGINT())
        .field("first_name", DataTypes.STRING())
        .field("last_name", DataTypes.STRING())
        .field("email", DataTypes.STRING())
        .build())
        .tableLoader(tableLoader)
        //.overwrite(true)
        .equalityFieldColumns(Collections.singletonList("id"))
        .build();
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-04-07 12:31:09

我通过转移到冰山v2规范修复了这个问题。你可以参考这个PR:https://github.com/apache/iceberg/pull/2410

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66816670

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档