我正在研究用例,它需要处理近600条消息/秒(订阅主题、转换、保存到Server表并生成回主题),但我们每5个实例只处理100条消息/秒。我们不能增加更多的实例来实现这一点。有什么建议会有帮助吗?
技术和基础设施:在PCF中部署了带有卡夫卡监听器(没有批处理侦听器)的spring引导应用程序。源主题和输出主题各有10个分区。默认属性和设置正在使用。转化需要几秒的时间。
发布于 2020-07-27 21:07:01
我有一个类似的用例,我提高了性能,将并发性(10)添加到每个侦听器,并使用以下配置增加队列中的分区
@Bean
public ThreadPoolTaskExecutor messageProcessorExecutor() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(poolSize);
exec.setMaxPoolSize(poolMaxSize);
exec.setKeepAliveSeconds(keepAlive);
return exec;
}
@Bean
public ConsumerFactory<String, Request> consumerFactory() {
DefaultKafkaConsumerFactory<String, Request> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfigs());
consumerFactory.setKeyDeserializer(new StringDeserializer());
consumerFactory.setValueDeserializer(new JsonDeserializer<>(Request.class));
return consumerFactory;
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Request>> kafkaListenerContainerFactory(
ThreadPoolTaskExecutor messageProcessorExecutor,
ConsumerFactory<String, Request> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, Request> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(10);
factory.getContainerProperties().setPollTimeout(pollTimeout);
factory.getContainerProperties().setConsumerTaskExecutor(messageProcessorExecutor);
return factory;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}https://stackoverflow.com/questions/63070785
复制相似问题