我有一个用Flink (Scala)编写的示例流式WordCount示例。在其中,我想将结果放入使用Flink-Kafka producer的Kafka中。但它并没有像预期的那样工作。
我的代码如下:
object WordCount {
def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment
.getExecutionEnvironment
.setStateBackend(new RocksDBStateBackend("file:///path/to/checkpoint", true))
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)
// set mode to exactly-once (this is the default)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig.setCheckpointTimeout(60000)
// prevent the tasks from failing if an error happens in their checkpointing, the checkpoint will just be declined.
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// prepare Kafka consumer properties
val kafkaConsumerProperties = new Properties
kafkaConsumerProperties.setProperty("zookeeper.connect", "localhost:2181")
kafkaConsumerProperties.setProperty("group.id", "flink")
kafkaConsumerProperties.setProperty("bootstrap.servers", "localhost:9092")
// set up Kafka Consumer
val kafkaConsumer = new FlinkKafkaConsumer[String]("input", new SimpleStringSchema, kafkaConsumerProperties)
println("Executing WordCount example.")
// get text from Kafka
val text = env.addSource(kafkaConsumer)
val counts: DataStream[(String, Int)] = text
// split up the lines in pairs (2-tuples) containing: (word,1)
.flatMap(_.toLowerCase.split("\\W+"))
.filter(_.nonEmpty)
.map((_, 1))
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0)
.mapWithState((in: (String, Int), count: Option[Int]) =>
count match {
case Some(c) => ((in._1, c), Some(c + in._2))
case None => ((in._1, 1), Some(in._2 + 1))
})
// emit result
println("Printing result to stdout.")
counts.map(_.toString()).addSink(new FlinkKafkaProducer[String]("output", new SimpleStringSchema,
kafkaProperties))
// execute program
env.execute("Streaming WordCount")
}
}我发送给Kafka input topic的数据是:
hi
hello我在Kafka topic output中没有得到任何输出。由于我是Apache Flink的新手,我不知道如何才能达到预期的效果。有人能帮我实现正确的行为吗?
发布于 2019-05-05 16:59:26
我在我的本地环境中运行您的代码,一切都很正常。我想你可以试试下面的命令:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output --from-beginninghttps://stackoverflow.com/questions/55989512
复制相似问题