首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >挂卡夫卡-生产者-网络线程阻止应用程序退出

挂卡夫卡-生产者-网络线程阻止应用程序退出
EN

Stack Overflow用户
提问于 2020-08-07 15:02:13
回答 1查看 1.6K关注 0票数 0

我有一个应用程序,它使用KafkaStreams并将更新写入下游数据库(Redis)。当下游数据库服务关闭时,应用程序试图优雅地关闭;但是,它会到达main()的末尾,并由于Kafka的未关闭网络线程而挂起。我可以从日志中确认KafkaStreams是使用kafkaStreams.close()关闭的:

代码语言:javascript
复制
2020-08-07 14:40:54.723  INFO 1 --- [  shutdown-hook] org.apache.kafka.streams.KafkaStreams    : stream-client [imc1-retrospect-processor-v34-31881bef-a717-4d63-b548-5c4754329f1f] Already in the pending shutdown state, wait to complete shutdown
2020-08-07 14:40:54.723  INFO 1 --- [  shutdown-hook] org.apache.kafka.streams.KafkaStreams    : stream-client [imc1-retrospect-processor-v34-31881bef-a717-4d63-b548-5c4754329f1f] Streams client stopped completely

在上面的日志消息之后,我还记录了所有活动JVM线程的调用堆栈,我得到了其中的15个:

代码语言:javascript
复制
Thread kafka-producer-network-thread | producer-15
2020-08-07 14:40:54.721  INFO 1 --- [           main] <my_app>   :  at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
2020-08-07 14:40:54.721  INFO 1 --- [           main] <my_app>   :  at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
2020-08-07 14:40:54.721  INFO 1 --- [           main] <my_app>   :  at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
2020-08-07 14:40:54.721  INFO 1 --- [           main] <my_app>   :  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
2020-08-07 14:40:54.721  INFO 1 --- [           main] <my_app>   :  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
2020-08-07 14:40:54.721  INFO 1 --- [           main] <my_app>   :  at org.apache.kafka.common.network.Selector.select(Selector.java:794)
2020-08-07 14:40:54.721  INFO 1 --- [           main] <my_app>   :  at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
2020-08-07 14:40:54.721  INFO 1 --- [           main] <my_app>   :  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
2020-08-07 14:40:54.721  INFO 1 --- [           main] <my_app>   :  at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)
2020-08-07 14:40:54.721  INFO 1 --- [           main] <my_app>   :  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
2020-08-07 14:40:54.721  INFO 1 --- [           main] <my_app>   :  at java.lang.Thread.run(Thread.java:748)

有人能帮我解释一下这种行为吗?这是kafkaStreams.close()中的一个bug吗?我也很困惑,为什么它看起来像一个生产者线程正在轮询;轮询不应该只由kafka消费者线程完成吗?

EN

回答 1

Stack Overflow用户

发布于 2020-08-18 14:29:41

您可能需要使用KafkaStreams#close(Duration)来指定close()的超时。按设计,close() (无参数)块。

关于close()和超时值有一些已知的问题:

还请注意,在堆栈跟踪中显示的NetworkClient.pollConsumer.poll完全不同。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63304367

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档