我已经在Apache Kafka上建立了一个排队系统。应用程序将向特定的Kafka topic生成消息,而在使用者端,我必须使用生成到该主题的所有记录。
我使用新的Java consumer Api编写了Consumer。代码看起来像这样
Properties props = new Properties();
props.put("bootstrap.servers", kafkaBrokerIp+":9092");
props.put("group.id",groupId);
props.put("enable.auto.commit", "true");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("consumertest"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.println("Data recieved : "+record.value());
}
}在这里,我需要永远运行消费者,这样生产者推送到kafka topic中的任何记录都应该立即被消费和处理。
所以我的困惑是,使用无限的while循环(就像在示例代码中一样)来使用数据是正确的方式吗?
发布于 2018-07-26 20:29:17
虽然可以使用无限循环,但可以在Kafka consumer documentation中找到一种稍微更优雅的方法,如下所示:
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records = consumer.poll(10000);
// Handle new records
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}这使您可以选择使用钩子正常关闭。
发布于 2016-06-30 18:14:59
是的,你可以使用无限循环。实际上,这并不是一个繁忙的循环。在每次轮询期间,如果数据不可用,调用将等待给定的时间段。
long millisToWait = 100;
consumer.poll(millisToWait);新消费者自动处理网络通信问题。确保在关机时,消费者可以优雅地关闭。
发布于 2018-07-25 22:44:14
是的,这是使用无限循环消耗数据的正确方式。
消费者通常是长期运行的应用程序,不断轮询Kafka以获取更多数据。消费者必须继续轮询Kafka,否则他们将被认为是死亡的,他们正在消费的分区将被交给组中的另一个消费者继续消费。
poll()返回一个记录列表。每条记录都包含记录所来自的主题和分区、分区中记录的偏移量以及记录的键和值。记录的处理是特定于应用程序的。
如果您退出循环,总是在退出之前关闭()消费者。这将关闭网络连接和套接字,还将立即触发重新平衡。
https://stackoverflow.com/questions/38095711
复制相似问题