当尝试使用java配置新创建的kafka主题时,值会被覆盖。
我尝试使用控制台命令设置相同的主题配置,并且它可以工作。不幸的是,当我尝试使用Java代码时,一些值会发生冲突并被覆盖。
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
Map<ConfigResource, Config> updateConfig = new HashMap<>();
// update retention Bytes for this topic
ConfigEntry retentionBytesEntry = new ConfigEntry(TopicConfig.RETENTION_BYTES_CONFIG, String.valueOf(retentionBytes));
updateConfig.put(resource, new Config(Collections.singleton(retentionBytesEntry)));
// update retention ms for this topic
ConfigEntry retentionMsEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(retentionMs));
updateConfig.put(resource, new Config(Collections.singleton(retentionMsEntry)));
// update segment Bytes for this topic
ConfigEntry segmentBytesEntry = new ConfigEntry(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(segmentbytes));
updateConfig.put(resource, new Config(Collections.singleton(segmentBytesEntry)));
// update segment ms for this topic
ConfigEntry segmentMsEntry = new ConfigEntry(TopicConfig.SEGMENT_MS_CONFIG, String.valueOf(segmentMs));
updateConfig.put(resource, new Config(Collections.singleton(segmentMsEntry)));
// Update the configuration
client.alterConfigs(updateConfig);我希望主题能够正确地给出所有的配置值。
发布于 2019-01-03 16:45:09
您的逻辑无法正常工作,因为您使用相同的键多次调用Map.put()。因此,只保留最后一个条目。
指定多个主题配置的正确方法是将它们添加到ConfigEntry对象中。只有在将ConfigEntry添加到Map之后。
例如:
// Your Topic Resource
ConfigResource cr = new ConfigResource(Type.TOPIC, "mytopic");
// Create all your configurations
Collection<ConfigEntry> entries = new ArrayList<>();
entries.add(new ConfigEntry(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(segmentbytes)));
entries.add(new ConfigEntry(TopicConfig.RETENTION_BYTES_CONFIG, String.valueOf(retentionBytes)));
...
// Create the Map
Config config = new Config(entries);
Map<ConfigResource, Config> configs = new HashMap<>();
configs.put(cr, config);
// Call alterConfigs()
admin.alterConfigs(configs);https://stackoverflow.com/questions/54024829
复制相似问题