这段代码有时会给我从头开始的所有消息并等待另一条消息,有时它只是在等待另一条消息
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
public class TestConsumer{
public static void main(String[] args) {
ConsumerConfig config;
Properties props = new Properties();
props.put("zookeeper.connect","sandbox.hortonworks.com:2181");
props.put("group.id", "group-4");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "200");
config = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector
(config);
String topic = "News";
System.out.println("Running");
Run(consumer,topic);
}
public static void Run(ConsumerConnector consumer,String topic){
HashMap<String,Integer> topicCountMap =
new HashMap<String,Integer>();
topicCountMap.put(topic, 1);
Map<String,List<KafkaStream<byte[],byte[]>>>
consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[],byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[],byte[]> it = stream.iterator();
List<String> msgTopicList = new ArrayList<String>();
int count = 0;
System.out.println("Waiting");
while(it.hasNext()){
MessageAndMetadata<byte[],byte[]> msgAndData = it.next();
String msg = new String(msgAndData.message());
msgTopicList.add(msg);
String key = "NoKey";
System.out.println(msg);
count++;
}
}
}我要做的就是从主题中获取所有发送给用户的消息,并对它们进行计数
做这件事最好的方法是什么?
版本kafka_2.10-0.8.1.2.2.4.2-2
发布于 2017-02-16 14:24:57
以下是您的示例。
这里最重要的是Kafka的消费者配置属性:
将从队列的开头开始。
props.put("auto.offset.reset", "smallest"); 不会存储此消费者的偏移量。
props.put("auto.commit.enable", "false");将等待5秒的消息,如果没有更多的消息将放弃。
props.put("consumer.timeout.ms", "5000");整个示例:
package com.xxx.yyy.zzz;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class KafkaConsumer {
private final ConsumerConnector consumer;
private final String topic;
private int count = 0;
public KafkaConsumer(final String zookeeper, final String groupId, final String topic) {
this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(zookeeper, groupId));
this.topic = topic;
}
// Initialize connection properties to Kafka and Zookeeper
private static ConsumerConfig createConsumerConfig(final String zookeeper, final String groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "2000");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
props.put("auto.commit.enable", "false");
props.put("consumer.timeout.ms", "5000");
return new ConsumerConfig(props);
}
private void getData() {
List<byte[]> msgs = new ArrayList();
Map<String, Integer> topicMap = new HashMap<>();
// Define single thread for topic
topicMap.put(topic, 1);
try {
Map<String, List<KafkaStream<byte[], byte[]>>> listMap = consumer.createMessageStreams(topicMap);
List<KafkaStream<byte[], byte[]>> kafkaStreams = listMap.get(topic);
// Collect the messages.
kafkaStreams.forEach(ks -> ks.forEach(mam -> msgs.add(mam.message())));
} catch (ConsumerTimeoutException exception) {
// There no more messages available -> so, we are done.
// Now print all your messages
msgs.forEach(System.out::println);
// count them
count = msgs.size();
} finally {
if (consumer != null) {
consumer.shutdown();
}
}
}
}https://stackoverflow.com/questions/37111935
复制相似问题