我开始尝试Kafka Streams了。我关注https://kafka.apache.org/0110/documentation/streams/quickstart。
我的沙箱是一个运行Ubuntu 16.04.2 LTS,Kafka 0.11.0.0和Scala 2.11.11的沙箱。
正如Kafka Streams快速入门指南中所解释的,以下是我遵循的步骤:
echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-file-input
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer当使用后一个命令查看streams wordcount- output时,我的标准输出显示如下:
all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
join 1
kafka 3
summit 1然后,在不中断bin/kafka- console -consumer.sh命令的情况下,我重新运行控制台生产者,如下所示:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt令我惊讶的是,标准输出没有发生变化,以反映新添加的中的更改。在我的理解中,file-input.txt用于生成额外的数据,因此字数计数应该已经刷新(所有标记现在都应该计数两次)。我的推理有什么问题?
发布于 2017-08-30 23:06:12
字数统计演示设计为在5秒后停止,如源:https://github.com/apache/kafka/blob/0.11.0.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java#L88中所示
查看上面源代码的最新版本,了解5秒后不会停止的源代码,但只有在按ctrl-c:https://github.com/apache/kafka/blob/0.11.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java时才会停止
https://stackoverflow.com/questions/45960076
复制相似问题