我在视频教程中看到,Kafka Broker在producer发布消息时支持3种类型的确认。
0-开火后忘记
1- Leader Ack
2-所有代理的Ack
我正在使用Kafka的Java API来发布消息。这是必须使用特定于每个代理的server.properties为每个代理设置的内容,还是必须由生产者设置的内容?如果必须由生产者设置,请解释如何使用Java API进行设置。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class KafkaProducerApp {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
try{
for(int i=0;i<150;i++) {
RecordMetadata ack = kafkaProducer.send(new ProducerRecord<String, String>("replicated_topic", Integer.toString(i), "MyMessage" + Integer.toString(i))).get();
System.out.println(" Offset = " + ack.offset());
System.out.println(" Partition = " + ack.partition());
}
} catch (Exception ex){
ex.printStackTrace();
} finally {
kafkaProducer.close();
}
}
}发布于 2017-07-09 12:11:52
它是一个生产者属性,其设置类似于代码中的其他属性:
properties.put("acks","all");所有可配置生产者属性的列表都可以在here中找到。
您可能还想查看与此生产者配置相关的代理(或主题) property min.insync.replicas。
发布于 2020-06-29 19:13:29
我认为你应该理解ack的属性实际做了什么,并看看幕后的情况。如果没有问题,您将看到此属性已由生产者配置为。
例如,您不能丢失任何消息,如审核日志。下面是我们如何启动生产者配置的代码:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("acks", "all"); //We are using acks=all in order to get the strongest guarantee we can.
props.put("retries", "3");
props.put("max.in.flight.requests.per.connection", "5");这是一个很小但很强大的变化,它对消息是否到达产生了重大影响。
这张图片来自Kafka In Action书,它代表了更清晰的acks属性:



https://stackoverflow.com/questions/44992566
复制相似问题