首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache Flink: IDE执行中的Kafka生成器未按预期工作

Apache Flink: IDE执行中的Kafka生成器未按预期工作
EN

Stack Overflow用户
提问于 2019-05-05 14:47:28
回答 1查看 294关注 0票数 0

我有一个用Flink (Scala)编写的示例流式WordCount示例。在其中,我想将结果放入使用Flink-Kafka producer的Kafka中。但它并没有像预期的那样工作。

我的代码如下:

代码语言:javascript
复制
object WordCount {
  def main(args: Array[String]) {
    // set up the execution environment
    val env = StreamExecutionEnvironment
      .getExecutionEnvironment
      .setStateBackend(new RocksDBStateBackend("file:///path/to/checkpoint", true))

    // start a checkpoint every 1000 ms
    env.enableCheckpointing(1000)

    // set mode to exactly-once (this is the default)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    // make sure 500 ms of progress happen between checkpoints
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

    // checkpoints have to complete within one minute, or are discarded
    env.getCheckpointConfig.setCheckpointTimeout(60000)

    // prevent the tasks from failing if an error happens in their checkpointing, the checkpoint will just be declined.
    env.getCheckpointConfig.setFailOnCheckpointingErrors(false)

    // allow only one checkpoint to be in progress at the same time
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

    // prepare Kafka consumer properties
    val kafkaConsumerProperties = new Properties
    kafkaConsumerProperties.setProperty("zookeeper.connect", "localhost:2181")
    kafkaConsumerProperties.setProperty("group.id", "flink")
    kafkaConsumerProperties.setProperty("bootstrap.servers", "localhost:9092")

    // set up Kafka Consumer
    val kafkaConsumer = new FlinkKafkaConsumer[String]("input", new SimpleStringSchema, kafkaConsumerProperties)

    println("Executing WordCount example.")

    // get text from Kafka
    val text = env.addSource(kafkaConsumer)

    val counts: DataStream[(String, Int)] = text
      // split up the lines in pairs (2-tuples) containing: (word,1)
      .flatMap(_.toLowerCase.split("\\W+"))
      .filter(_.nonEmpty)
      .map((_, 1))
      // group by the tuple field "0" and sum up tuple field "1"
      .keyBy(0)
      .mapWithState((in: (String, Int), count: Option[Int]) =>
        count match {
          case Some(c) => ((in._1, c), Some(c + in._2))
          case None => ((in._1, 1), Some(in._2 + 1))
        })

    // emit result
    println("Printing result to stdout.")
    counts.map(_.toString()).addSink(new FlinkKafkaProducer[String]("output", new SimpleStringSchema,
      kafkaProperties))

    // execute program
    env.execute("Streaming WordCount")
  }
}

我发送给Kafka input topic的数据是:

代码语言:javascript
复制
hi
hello

我在Kafka topic output中没有得到任何输出。由于我是Apache Flink的新手,我不知道如何才能达到预期的效果。有人能帮我实现正确的行为吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-05-05 16:59:26

我在我的本地环境中运行您的代码,一切都很正常。我想你可以试试下面的命令:

代码语言:javascript
复制
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic  output --from-beginning
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55989512

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档