我有一个旧项目(它不是我的),我正在尝试将它从Kafka 2.1更新到2.4。
我有下面这段代码
public synchronized void increasePartitions(String topic, int partitions) throws InvalidPartitionsException, IllegalArgumentException {
StringBuilder commandString = new StringBuilder();
commandString.append("--alter");
commandString.append(" --topic ").append(topic);
commandString.append(" --zookeeper ").append(config.getOrDefault("zookeeper.connect",
"localhost:2181"));
commandString.append(" --partitions ").append(partitions);
String[] command = commandString.toString().split(" ");
TopicCommand.alterTopic(kafkaZkClient, new TopicCommand.TopicCommandOptions(command));
}它说TopicCommand的alterTopic方法不存在。我正在看文档,但我不知道如何解决它。
我需要这个方法来做完全相同的事情,但使用Kafka版本2.4。
发布于 2020-03-26 19:18:55
您应该使用Admin API来执行这样的任务。
为了添加分区,需要使用createPartitions()方法。
例如,要将my-topic的分区数增加到10:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
Admin admin = Admin.create(props);
Map<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put("my-topic", NewPartitions.increaseTo(10));
CreatePartitionsResult createPartitions = admin.createPartitions(newPartitions);
createPartitions.all().get();https://stackoverflow.com/questions/60865575
复制相似问题