首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka -生产者确认

Kafka -生产者确认
EN

Stack Overflow用户
提问于 2017-07-09 10:58:21
回答 2查看 24.1K关注 0票数 13

我在视频教程中看到,Kafka Broker在producer发布消息时支持3种类型的确认。

0-开火后忘记

1- Leader Ack

2-所有代理的Ack

我正在使用Kafka的Java API来发布消息。这是必须使用特定于每个代理的server.properties为每个代理设置的内容,还是必须由生产者设置的内容?如果必须由生产者设置,请解释如何使用Java API进行设置。

代码语言:javascript
复制
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class KafkaProducerApp {

    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);

        try{
            for(int i=0;i<150;i++) {
                RecordMetadata ack = kafkaProducer.send(new ProducerRecord<String, String>("replicated_topic", Integer.toString(i), "MyMessage" + Integer.toString(i))).get();
                System.out.println(" Offset = " + ack.offset());
                System.out.println(" Partition = " + ack.partition());
            }
        } catch (Exception ex){
            ex.printStackTrace();
        } finally {
            kafkaProducer.close();
        }



    }

}
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-07-09 12:11:52

它是一个生产者属性,其设置类似于代码中的其他属性:

代码语言:javascript
复制
properties.put("acks","all");

所有可配置生产者属性的列表都可以在here中找到。

您可能还想查看与此生产者配置相关的代理(或主题) property min.insync.replicas

票数 12
EN

Stack Overflow用户

发布于 2020-06-29 19:13:29

我认为你应该理解ack的属性实际做了什么,并看看幕后的情况。如果没有问题,您将看到此属性已由生产者配置为

例如,您不能丢失任何消息,如审核日志。下面是我们如何启动生产者配置的代码:

代码语言:javascript
复制
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("acks", "all"); //We are using acks=all in order to get the strongest guarantee we can.
props.put("retries", "3");
props.put("max.in.flight.requests.per.connection", "5");

这是一个很小但很强大的变化,它对消息是否到达产生了重大影响。

这张图片来自Kafka In Action书,它代表了更清晰的acks属性:

票数 7
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44992566

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档