首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >检查Python中是否存在Kafka主题

检查Python中是否存在Kafka主题
EN

Stack Overflow用户
提问于 2015-06-19 16:39:00
回答 4查看 10.7K关注 0票数 5

如果卡夫卡主题还不存在,我想要创建它。我知道如何通过bash创建一个主题,但我不知道如何检查它是否存在。

代码语言:javascript
复制
topic_exists = ??????
if not topic_exists:
    subprocess.call([os.path.join(KAFKABIN, 'kafka-topics.sh'),
        '--create',  
        '--zookeeper', '{}:2181'.format(KAFKAHOST),
        '--topic', str(self.topic), 
        '--partitions', str(self.partitions),
        '--replication-factor', str(self.replication_factor)])
EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2015-06-19 19:11:14

您可以为--list (List all available topics)使用kafka-topics.sh选项,并查看self.topic是否存在于topics数组中,如下所示。

根据主题的数量,这种方法可能有点重。如果是这样的话,您就可以不用使用--describe (List details for the given topics)了,如果主题不存在,它很可能会返回空的。我还没有对它进行彻底的测试,所以我不能确切地说这个解决方案(--describe)有多牢固,但是对您来说,进一步研究可能是值得的。

代码语言:javascript
复制
wanted_topics = ['host_updates_queue', 'foo_bar']

topics = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
        '--list',
        '--zookeeper', '{}:2181'.format(KAFKAHOST)])

for wanted in wanted_topics:
    if wanted in topics:
        print '\'{}\' topic exists!'.format(wanted)
    else:
        print '\'{}\' topic does NOT exist!'.format(wanted)

    topic_desc = subprocess.check_output([os.path.join(KAFKABIN, 'kafka-topics.sh'),
        '--describe',
        '--topic', wanted,
        '--zookeeper', '{}:2181'.format(KAFKAHOST)])

    if not topic_desc:
        print 'No description found for the topic \'{}\''.format(wanted)

产出:

代码语言:javascript
复制
root@dev:/opt/kafka/kafka_2.10-0.8.2.1# ./t.py
'host_updates_queue' topic exists!
'foo_bar' topic does NOT exist!
No description found for the topic 'foo_bar'

还有一个可用的代理配置,因此您不必执行以下任何步骤:

启用服务器上主题的自动创建。如果将此设置为true,则尝试为不存在的主题生成数据或获取元数据将自动使用默认复制因子和分区数创建它。

如果可能的话,我会采取这种方法。

请注意,您应该为您的代理为server.propertiesdefault.replication.factor设置主题信任( topic ),以便在代码段中与您的设置相匹配。

票数 7
EN

Stack Overflow用户

发布于 2015-09-06 19:02:47

另一种很好的方法是使用python模块:

代码语言:javascript
复制
kafka_client = kafka.KafkaClient(kafka_server_name)
server_topics = kafka_client.topic_partitions

if topic_name in server_topics:
   your code....

kafka_client.topic_partitions返回主题列表。

票数 9
EN

Stack Overflow用户

发布于 2020-06-29 11:26:50

为此使用卡夫卡-python消费者 api。

代码语言:javascript
复制
import kafka 
consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=your_server_list) 
new_topics = set(wanted_topics)-set(consumer.topics())
for topic in new_topics:
    create(topic)
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/30943129

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档