我试图通过火花流程序消费卡夫卡制作人的信息。
这是我的节目
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
// lines.print()
lines.foreachRDD(rdd=>{
rdd.foreach(message=>
println(message))
})以上程序已成功运行。但我没看到任何信息被打印出来。
发布于 2017-02-11 13:31:30
使用"local[*]"设置主url
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]")您还可以尝试打电话给get (),查看是否收到消息。
lines.foreachRDD { rdd =>
rdd.collect().foreach(println)
}https://stackoverflow.com/questions/42176345
复制相似问题