首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何处理卡夫卡水槽中的异常?

如何处理卡夫卡水槽中的异常?
EN

Stack Overflow用户
提问于 2018-09-13 07:45:14
回答 1查看 1.3K关注 0票数 1

我有一份Flink的工作,把数据写进卡夫卡。Kafka主题的最大消息大小设置为5MB,所以如果我尝试编写任何大于5MB的记录,它会抛出以下异常并将作业降低。

代码语言:javascript
复制
java.lang.Exception: Failed to send data to Kafka: The request included a message larger than the max message size the server will accept.
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke(FlinkKafkaProducer010.java:350)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

现在,我已经在作业中配置了检查点,所以如果作业失败,它将重新启动。问题是,每次它重新启动,它都会为相同的记录失败,并进入一个无限循环的失败和重新启动。有没有办法在我的代码中处理这个Kafka异常,这样它就不会影响整个工作?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-09-13 11:49:57

也许你可以在卡夫卡水槽前面引入一个过滤器来检测和过滤那些太大的记录。有点烦躁,但可能很容易。否则,为了能够处理异常,我将考虑扩展FlinkKafkaProducer010。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52308911

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档