我正在使用spring 2.2.8,并使用以下设置编写一个简单的异步生成器:
producer config key : compression.type and value is : none
producer config key : request.timeout.ms and value is : 10000
producer config key : acks and value is : all
producer config key : batch.size and value is : 33554431
producer config key : delivery.timeout.ms and value is : 1210500
producer config key : retry.backoff.ms and value is : 3000
producer config key : key.serializer and value is : class org.apache.kafka.common.serialization.StringSerializer
producer config key : security.protocol and value is : SSL
producer config key : retries and value is : 3
producer config key : value.serializer and value is : class io.confluent.kafka.serializers.KafkaAvroSerializer
producer config key : max.in.flight.requests.per.connection and value is : 1
producer config key : linger.ms and value is : 1200000
producer config key : client.id and value is : <<my app name>>我使用下面的代码片段打印了上面的生产者设置:
DefaultKafkaProducerFactory defaultKafkaProducerFactory = (DefaultKafkaProducerFactory) mykafkaProducerFactory;
Set<Entry> set = defaultKafkaProducerFactory.getConfigurationProperties().entrySet();
set.forEach( item ->
System.out.println("producer config key : "+item.getKey()+" and value is : "+item.getValue())
);现在,通过调用下面的构造函数,我创建了一个以KafkaTemplate为false的autoFlush
public KafkaTemplate(mykafkaProducerFactory, boolean autoFlush) 现在,我有了一个异步生成器,在10秒内生成10条消息。令人惊讶的是,我在几秒钟内就将所有10条消息发布到了主题上,我确信这10条消息的大小比我的batch.size: 33554431小得多
现在我的问题是
发布于 2020-07-09 20:17:07
看起来您没有正确地设置这些属性,请展示您是如何设置这些属性的。我刚用
batch.size=1000000
linger.ms=10000并连续发送了10条信息,他们花了整整10秒才到达消费者那里。
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.type=batch
spring.kafka.producer.properties.batch.size=1000000
spring.kafka.producer.properties.linger.ms=10000@SpringBootApplication
public class So62820095Application {
private static final Logger LOG = LoggerFactory.getLogger(So62820095Application.class);
public static void main(String[] args) {
SpringApplication.run(So62820095Application.class, args);
}
@KafkaListener(id = "so62820095", topics = "so62820095")
public void listen(List<String> in) {
LOG.info(in.toString());
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so62820095").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> IntStream.range(0, 10).forEach(i -> template.send("so62820095", "foo" + i));
}
}https://stackoverflow.com/questions/62822749
复制相似问题