我在文档中找不到如何在使用confluent-kafka创建生产者时设置保留时间。
如果我只指定'bootstrap-servers‘,默认保留时间是1天。我希望能够改变这一点。
(我想在python API中完成这项工作,而不是在命令行上。)
发布于 2018-09-27 13:57:11
保留时间是在创建主题时设置的,而不是在生产者配置上设置的。
如果您的server.properties允许自动创建主题,那么您将在其中设置默认值。
否则,您可以使用AdminClient API发送支持config属性dict<str,str>的NewTopic请求
from confluent_kafka.admin import AdminClient, NewTopic
# a = AdminClient(...)
topics = list()
t = NewTopic(topic, num_partitions=3, replication_factor=1, config={'log.retention.hours': '168'})
topics.append(t)
# Call create_topics to asynchronously create topics, a dict
# of <topic,future> is returned.
fs = a.create_topics(topics)
# Wait for operation to finish.
# Timeouts are preferably controlled by passing request_timeout=15.0
# to the create_topics() call.
# All futures will finish at the same time.
for topic, f in fs.items():
try:
f.result() # The result itself is None
print("Topic {} created".format(topic))
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))在同一链接中,您可以找到alter topic请求
发布于 2017-05-18 16:05:36
保留时间不是生产者的属性。默认保留时间是在代理配置文件server.properties和log.retention.hours等属性中设置的,例如安装上的/etc/kafka/server.properties ...depending。
您可以通过以下方式更改每个主题的保留时间:
$ <path-to-kafka>/bin/kafka-topics.sh --zookeeper <zookeeper-quorum> --alter --topic <topic-name> --config retention.ms=<your-desired-retention-in-ms>HTH……
https://stackoverflow.com/questions/44036398
复制相似问题