首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >永远运行kafka Consumer (新的Consumer API)

永远运行kafka Consumer (新的Consumer API)
EN

Stack Overflow用户
提问于 2016-06-29 17:34:46
回答 4查看 4.5K关注 0票数 1

我已经在Apache Kafka上建立了一个排队系统。应用程序将向特定的Kafka topic生成消息,而在使用者端,我必须使用生成到该主题的所有记录。

我使用新的Java consumer Api编写了Consumer。代码看起来像这样

代码语言:javascript
复制
  Properties props = new Properties();  
                     props.put("bootstrap.servers", kafkaBrokerIp+":9092");  
                     props.put("group.id",groupId);  
                     props.put("enable.auto.commit", "true");
                     props.put("session.timeout.ms", "30000");
                     props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
                     consumer.subscribe(Arrays.asList("consumertest"));  
                     while (true) {  
                         ConsumerRecords<String, String> records = consumer.poll(100);  
                         for (ConsumerRecord<String, String> record : records){  
                             System.out.println("Data recieved : "+record.value());  
                             }  
                     }

在这里,我需要永远运行消费者,这样生产者推送到kafka topic中的任何记录都应该立即被消费和处理。

所以我的困惑是,使用无限的while循环(就像在示例代码中一样)来使用数据是正确的方式吗?

EN

回答 4

Stack Overflow用户

发布于 2018-07-26 20:29:17

虽然可以使用无限循环,但可以在Kafka consumer documentation中找到一种稍微更优雅的方法,如下所示:

代码语言:javascript
复制
public class KafkaConsumerRunner implements Runnable {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer consumer;

    public void run() {
        try {
            consumer.subscribe(Arrays.asList("topic"));
            while (!closed.get()) {
                ConsumerRecords records = consumer.poll(10000);
                // Handle new records
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            if (!closed.get()) throw e;
        } finally {
           consumer.close();
        }
    }

    // Shutdown hook which can be called from a separate thread
    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }
}

这使您可以选择使用钩子正常关闭。

票数 2
EN

Stack Overflow用户

发布于 2016-06-30 18:14:59

是的,你可以使用无限循环。实际上,这并不是一个繁忙的循环。在每次轮询期间,如果数据不可用,调用将等待给定的时间段。

代码语言:javascript
复制
long millisToWait = 100;
consumer.poll(millisToWait);

新消费者自动处理网络通信问题。确保在关机时,消费者可以优雅地关闭。

票数 1
EN

Stack Overflow用户

发布于 2018-07-25 22:44:14

是的,这是使用无限循环消耗数据的正确方式。

消费者通常是长期运行的应用程序,不断轮询Kafka以获取更多数据。消费者必须继续轮询Kafka,否则他们将被认为是死亡的,他们正在消费的分区将被交给组中的另一个消费者继续消费。

poll()返回一个记录列表。每条记录都包含记录所来自的主题和分区、分区中记录的偏移量以及记录的键和值。记录的处理是特定于应用程序的。

如果您退出循环,总是在退出之前关闭()消费者。这将关闭网络连接和套接字,还将立即触发重新平衡。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38095711

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档