我有一个应用程序,它使用KafkaStreams并将更新写入下游数据库(Redis)。当下游数据库服务关闭时,应用程序试图优雅地关闭;但是,它会到达main()的末尾,并由于Kafka的未关闭网络线程而挂起。我可以从日志中确认KafkaStreams是使用kafkaStreams.close()关闭的:
2020-08-07 14:40:54.723 INFO 1 --- [ shutdown-hook] org.apache.kafka.streams.KafkaStreams : stream-client [imc1-retrospect-processor-v34-31881bef-a717-4d63-b548-5c4754329f1f] Already in the pending shutdown state, wait to complete shutdown
2020-08-07 14:40:54.723 INFO 1 --- [ shutdown-hook] org.apache.kafka.streams.KafkaStreams : stream-client [imc1-retrospect-processor-v34-31881bef-a717-4d63-b548-5c4754329f1f] Streams client stopped completely在上面的日志消息之后,我还记录了所有活动JVM线程的调用堆栈,我得到了其中的15个:
Thread kafka-producer-network-thread | producer-15
2020-08-07 14:40:54.721 INFO 1 --- [ main] <my_app> : at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
2020-08-07 14:40:54.721 INFO 1 --- [ main] <my_app> : at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
2020-08-07 14:40:54.721 INFO 1 --- [ main] <my_app> : at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
2020-08-07 14:40:54.721 INFO 1 --- [ main] <my_app> : at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
2020-08-07 14:40:54.721 INFO 1 --- [ main] <my_app> : at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
2020-08-07 14:40:54.721 INFO 1 --- [ main] <my_app> : at org.apache.kafka.common.network.Selector.select(Selector.java:794)
2020-08-07 14:40:54.721 INFO 1 --- [ main] <my_app> : at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
2020-08-07 14:40:54.721 INFO 1 --- [ main] <my_app> : at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
2020-08-07 14:40:54.721 INFO 1 --- [ main] <my_app> : at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)
2020-08-07 14:40:54.721 INFO 1 --- [ main] <my_app> : at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
2020-08-07 14:40:54.721 INFO 1 --- [ main] <my_app> : at java.lang.Thread.run(Thread.java:748)有人能帮我解释一下这种行为吗?这是kafkaStreams.close()中的一个bug吗?我也很困惑,为什么它看起来像一个生产者线程正在轮询;轮询不应该只由kafka消费者线程完成吗?
发布于 2020-08-18 14:29:41
您可能需要使用KafkaStreams#close(Duration)来指定close()的超时。按设计,close() (无参数)块。
关于close()和超时值有一些已知的问题:
还请注意,在堆栈跟踪中显示的NetworkClient.poll与Consumer.poll完全不同。
https://stackoverflow.com/questions/63304367
复制相似问题