首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >当KafkaTemplate autoFlush设置为true时,多线程生产者性能如何?

当KafkaTemplate autoFlush设置为true时,多线程生产者性能如何?
EN

Stack Overflow用户
提问于 2018-10-17 10:57:40
回答 1查看 1K关注 0票数 1

KafkaTemplate有一个autoFlush选项,它会在每次发送消息时刷新。

代码语言:javascript
复制
/**
 * Send the producer record.
 * @param producerRecord the producer record.
 * @return a Future for the {@link RecordMetadata}.
 */
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
    if (this.transactional) {
        Assert.state(inTransaction(),
                "No transaction is in process; "
                    + "possible solutions: run the template operation within the scope of a "
                    + "template.executeInTransaction() operation, start a transaction with @Transactional "
                    + "before invoking the template method, "
                    + "run in a transaction started by a listener container when consuming a record");
    }
    final Producer<K, V> producer = getTheProducer();
    if (this.logger.isTraceEnabled()) {
        this.logger.trace("Sending: " + producerRecord);
    }
    final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
    producer.send(producerRecord, new Callback() {

        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            try {
                if (exception == null) {
                    future.set(new SendResult<>(producerRecord, metadata));
                    if (KafkaTemplate.this.producerListener != null
                            && KafkaTemplate.this.producerListener.isInterestedInSuccess()) {
                        KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(),
                                producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata);
                    }
                }
                else {
                    future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
                    if (KafkaTemplate.this.producerListener != null) {
                        KafkaTemplate.this.producerListener.onError(producerRecord.topic(),
                                producerRecord.partition(),
                                producerRecord.key(),
                                producerRecord.value(),
                                exception);
                    }
                }
            }
            finally {
                if (!KafkaTemplate.this.transactional) {
                    closeProducer(producer, false);
                }
            }
        }

    });
    if (this.autoFlush) {
        flush();
    }
    if (this.logger.isTraceEnabled()) {
        this.logger.trace("Sent: " + producerRecord);
    }
    return future;
}

这对于那些想使每个发送请求同步的人来说似乎很好。

但当与产生单例生产者对象的DefaultKafkaProducerFactory一起使用时,KafkaTemplate的所有线程本地生产者都指向同一个单一生产者,从而共享发送队列。

在多线程web环境中,每个线程不仅要等待来自自己的消息,还要等待其他线程已经发送的所有消息。

我认为这不仅是性能方面的坏主意,也是可用性方面的坏主意,因为在一些Kafka代理崩溃的情况下,很可能所有想要发送消息的线程都挂起了,而其中一些线程并不需要这样做。

我说的对吗?在任何指南、文档或其他东西上不应该有这样的警告吗?

EN

回答 1

Stack Overflow用户

发布于 2018-10-17 20:59:57

我想JavaDocs很清楚..。

代码语言:javascript
复制
/**
 * Create an instance using the supplied producer factory and autoFlush setting.
 * <p>
 * Set autoFlush to {@code true} if you have configured the producer's
 * {@code linger.ms} to a non-default value and wish send operations on this template
 * to occur immediately, regardless of that setting, or if you wish to block until the
 * broker has acknowledged receipt according to the producer's {@code acks} property.
 * @param producerFactory the producer factory.
 * @param autoFlush true to flush after each send.
 * @see Producer#flush()
 */

你认为还需要什么?

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52846677

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档