首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么异步生产者要生成消息,linger.ms和batch.size设置为大值,autoFlush设置为false?

为什么异步生产者要生成消息,linger.ms和batch.size设置为大值,autoFlush设置为false?
EN

Stack Overflow用户
提问于 2020-07-09 19:58:26
回答 1查看 2.1K关注 0票数 1

我正在使用spring 2.2.8,并使用以下设置编写一个简单的异步生成器:

代码语言:javascript
复制
producer config key : compression.type  and value is : none
producer config key : request.timeout.ms  and value is : 10000
producer config key : acks  and value is : all
producer config key : batch.size  and value is : 33554431
producer config key : delivery.timeout.ms  and value is : 1210500
producer config key : retry.backoff.ms  and value is : 3000
producer config key : key.serializer  and value is : class org.apache.kafka.common.serialization.StringSerializer
producer config key : security.protocol  and value is : SSL
producer config key : retries  and value is : 3
producer config key : value.serializer  and value is : class io.confluent.kafka.serializers.KafkaAvroSerializer
producer config key : max.in.flight.requests.per.connection  and value is : 1
producer config key : linger.ms  and value is : 1200000
producer config key : client.id  and value is : <<my app name>>

我使用下面的代码片段打印了上面的生产者设置:

代码语言:javascript
复制
DefaultKafkaProducerFactory defaultKafkaProducerFactory = (DefaultKafkaProducerFactory) mykafkaProducerFactory;
        Set<Entry> set  = defaultKafkaProducerFactory.getConfigurationProperties().entrySet();
        set.forEach( item ->
                System.out.println("producer config key : "+item.getKey()+"  and value is : "+item.getValue())
        );

现在,通过调用下面的构造函数,我创建了一个以KafkaTemplate为false的autoFlush

代码语言:javascript
复制
public KafkaTemplate(mykafkaProducerFactory, boolean autoFlush) 

现在,我有了一个异步生成器,在10秒内生成10条消息。令人惊讶的是,我在几秒钟内就将所有10条消息发布到了主题上,我确信这10条消息的大小比我的batch.size: 33554431小得多

现在我的问题是

  1. 为什么要发布消息而不是在生成消息之前等待linger.ms或batch.size?
EN

回答 1

Stack Overflow用户

发布于 2020-07-09 20:17:07

看起来您没有正确地设置这些属性,请展示您是如何设置这些属性的。我刚用

代码语言:javascript
复制
batch.size=1000000
linger.ms=10000

并连续发送了10条信息,他们花了整整10秒才到达消费者那里。

代码语言:javascript
复制
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.type=batch

spring.kafka.producer.properties.batch.size=1000000
spring.kafka.producer.properties.linger.ms=10000
代码语言:javascript
复制
@SpringBootApplication
public class So62820095Application {


    private static final Logger LOG = LoggerFactory.getLogger(So62820095Application.class);


    public static void main(String[] args) {
        SpringApplication.run(So62820095Application.class, args);
    }

    @KafkaListener(id = "so62820095", topics = "so62820095")
    public void listen(List<String> in) {
        LOG.info(in.toString());
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so62820095").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> IntStream.range(0,  10).forEach(i -> template.send("so62820095", "foo" + i));
    }

}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62822749

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档