首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Vertx Kafka客户端消费者示例不工作且未显示任何错误

Vertx Kafka客户端消费者示例不工作且未显示任何错误
EN

Stack Overflow用户
提问于 2018-01-09 20:11:36
回答 1查看 1.2K关注 0票数 1

有没有人能让这个例子起作用?http://vertx.io/docs/vertx-kafka-client/java/

我尝试连接到正在运行的Kafka和Vertx-kafka-client 3.5.0,但没有成功。更奇怪的是,运行这个小代码片段,但根本没有任何kafka集群,消费者说它订阅成功。

代码语言:javascript
复制
public class KafkaConsumerExampleVerticle extends AbstractVerticle {

    public static void main(String[] args) throws Exception {
        KafkaConsumerExampleVerticle verticle = new KafkaConsumerExampleVerticle();
        Vertx.vertx().deployVerticle(verticle);
    }

    @Override
    public void start() throws Exception {

        KafkaConsumer<String, String> consumer = createKafkaConsumer();

        consumer.handler(m -> {
            System.out.println("Consumer | message received: " + m);
        });

        consumer.subscribe("topic1", h -> {
            if (h.succeeded()) {
                System.out.println("Consumer subscribed!");
            } else if (h.failed()){
                System.out.println("Consumer failed: " + h.cause());                
            }
        });

    }

    private KafkaConsumer<String, String> createKafkaConsumer() {
        Map<String, String> config = new HashMap<>();
        config.put("bootstrap.servers", "192.68.99.100:9092");
        config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        config.put("group.id", "my_group");
        config.put("auto.offset.reset", "earliest");
        config.put("enable.auto.commit", "false");
        KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);
        return consumer;
    }

}

无论我把哪个IP放在那里,或者我有没有运行任何kafka集群,消费者都会成功订阅。这是预期的行为吗?如果成功了,怎么知道设置出了什么问题呢?

EN

回答 1

Stack Overflow用户

发布于 2018-01-10 19:44:08

你看到的行为是我们所期望的,这是由于底层使用的“原生”Kafka客户端的行为。consumer.subscribe方法调用由“本机”客户端提供的底层"subscribe“方法,该方法不会启动任何连接,因此不会与代理进行任何通信(它只在本地保存有关订阅的信息)。“本机”Kafka客户端仅在调用本机"poll“方法时才启动与代理的所有连接(因此在本例中发送订阅请求)。在这一点上,另一个问题来了:实际上,如果指定的代理不可访问,“本地”轮询方法就会挂起。实际上,它不会挂起,但它会开始发送一个等待响应的“查找协调器”请求。默认的request.timeout.ms是305000 (大约5分钟),直到它认为请求失败。即使您将此值设置为较低的值,请求也会失败,但稍后会检测到连接失败。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48167986

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档