首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka python优雅关闭消费者

Kafka python优雅关闭消费者
EN

Stack Overflow用户
提问于 2020-04-28 15:21:54
回答 1查看 836关注 0票数 7

我试图优雅地关闭一个kafka用户,但脚本阻塞了停止HeartBeat线程。如何使用kafka-python在SIGTERM上优雅地关闭消费者。这就是我所做的

代码语言:javascript
复制
import logger as logging
import time
import sys
from kafka import KafkaConsumer
import numpy as np
import signal


log = logging.getLogger(__name__)


class Cons:
    def __init__(self):

        signal.signal(signal.SIGINT, self.sigterm_handler)
        signal.signal(signal.SIGTERM, self.sigterm_handler)
        self.consumer = KafkaConsumer('dummy-topic', group_id='poll-test', bootstrap_servers=['b1'])

    def sigterm_handler(self, signum, frame):
        log.info("Sigterm handler")
        self.consumer.close(autocommit=False)
        sys.exit(0)



    def consume(self):
        try:
            while True:
                records = self.consumer.poll(timeout_ms=500, max_records=500)
                for topic_partition, consumer_records in records.items():
                    for record in consumer_records:
                        log.info("Got Record - {}".format(record))
                    #code to manually commit


        except ValueError as e:
            log.exception("exception")


if __name__ == '__main__':
    c=Cons()
    c.consume()

在启用调试日志的情况下,这是我得到的输出,代码在这里被阻塞。

代码语言:javascript
复制
^C2020-04-28 07:18:33,050 - MainThread - __main__ - INFO - Sigterm handler
2020-04-28 07:18:33,050 - MainThread - kafka.consumer.group - DEBUG - Closing the KafkaConsumer.
2020-04-28 07:18:33,051 - MainThread - kafka.coordinator - INFO - Stopping heartbeat thread

这背后的原因是什么?在SIGTERM或SIGINT上关闭消费者的正确方法是什么?

EN

回答 1

Stack Overflow用户

发布于 2021-07-22 21:47:22

它看起来最终在2020年得到了修复,所以任何最近的版本都应该是可以的:

https://github.com/dpkp/kafka-python/commit/91daea329bb40ed80bddef4635770d24b670b0c6

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61474757

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档