有没有人能让这个例子起作用?http://vertx.io/docs/vertx-kafka-client/java/
我尝试连接到正在运行的Kafka和Vertx-kafka-client 3.5.0,但没有成功。更奇怪的是,运行这个小代码片段,但根本没有任何kafka集群,消费者说它订阅成功。
public class KafkaConsumerExampleVerticle extends AbstractVerticle {
public static void main(String[] args) throws Exception {
KafkaConsumerExampleVerticle verticle = new KafkaConsumerExampleVerticle();
Vertx.vertx().deployVerticle(verticle);
}
@Override
public void start() throws Exception {
KafkaConsumer<String, String> consumer = createKafkaConsumer();
consumer.handler(m -> {
System.out.println("Consumer | message received: " + m);
});
consumer.subscribe("topic1", h -> {
if (h.succeeded()) {
System.out.println("Consumer subscribed!");
} else if (h.failed()){
System.out.println("Consumer failed: " + h.cause());
}
});
}
private KafkaConsumer<String, String> createKafkaConsumer() {
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "192.68.99.100:9092");
config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);
return consumer;
}
}无论我把哪个IP放在那里,或者我有没有运行任何kafka集群,消费者都会成功订阅。这是预期的行为吗?如果成功了,怎么知道设置出了什么问题呢?
发布于 2018-01-10 19:44:08
你看到的行为是我们所期望的,这是由于底层使用的“原生”Kafka客户端的行为。consumer.subscribe方法调用由“本机”客户端提供的底层"subscribe“方法,该方法不会启动任何连接,因此不会与代理进行任何通信(它只在本地保存有关订阅的信息)。“本机”Kafka客户端仅在调用本机"poll“方法时才启动与代理的所有连接(因此在本例中发送订阅请求)。在这一点上,另一个问题来了:实际上,如果指定的代理不可访问,“本地”轮询方法就会挂起。实际上,它不会挂起,但它会开始发送一个等待响应的“查找协调器”请求。默认的request.timeout.ms是305000 (大约5分钟),直到它认为请求失败。即使您将此值设置为较低的值,请求也会失败,但稍后会检测到连接失败。
https://stackoverflow.com/questions/48167986
复制相似问题