我对Spark和Scala非常陌生,我正在使用ReduceByKeyAndWindows来计算kafka消息中的单词,因为我需要使用窗口功能。
我的应用程序的目的是在检测到卡夫卡在特定时间包含特定单词的"x“次消息时发出警报。然后,从开始重新启动。
下面的代码检测到了单词,但我无法使我的应用程序重新启动。我在想,如果可能的话,重新启动ReduceByKeyAndWindows的积累或其他方法来做到这一点。
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
object KafKaWordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("KafKaWordCount")
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint("checkpoint")
val lines = ssc.socketTextStream("localhost", 9999) //using NETCAT for test
val wordCounts =
lines.map(x => (x, 1))
.reduceByKeyAndWindow(_ + _, _ - _, Seconds(60), Seconds(2), 2)
//if the value from the key (word) exceeds 10 , sent alert and Restart the values
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}使用来自Yuval 的第二个例子,将reach从10减少到3,并发送7条消息。
第二个asnwer的输出是
Word: hello reached count: 1
Word: hello reached count: 2
//No print this message, its OK but the next word not start with 1
Word: hello reached count: 4
Word: hello reached count: 5
Word: hello reached count: 6
Word: hello reached count: 7我期望的输出
Word: hello reached count: 1
Word: hello reached count: 2
Word: hello reached count: 1
Word: hello reached count: 2
Word: hello reached count: 1发布于 2016-03-26 18:19:04
如果您使用的是Spark1.6.0及以上版本,则可以使用实验性DStream.mapWithState来保持单词计数的更新状态。一旦达到极限,您就可以删除状态并将其释放到管道中,并使用DStream.foreach打印出来。
object KafKaWordCount {
def main(args: Array[String]) {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("KafKaWordCount")
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint("checkpoint")
val lines = ssc.socketTextStream("localhost", 9999) //using NETCAT for test
val stateSpec = StateSpec.function(updateWordCount _)
lines.map(x => (x, 1))
.reduceByKeyAndWindow(_ + _, _ - _, Seconds(60), Seconds(2), 2)
.mapWithState(stateSpec)
.filter(_.isDefined)
.foreachRDD(rdd =>
rdd.foreach { case (word, count) =>
println(s"Word: $word reached count: $count") })
ssc.start()
ssc.awaitTermination()
}
def updateWordCount(key: String,
value: Option[Int],
state: State[(String, Int)]): Option[(String, Int)] = {
def updateCountState(count: Int): Option[(String, Int)] = {
if (count == 10) {
if (state.exists()) state.remove()
Some((key, count))
}
else {
state.update((key, count))
None
}
}
value match {
case Some(count) => updateCountState(count)
case _ => None
}
}
}如果没有,则可以使用较慢的DStream.updateStateByKey。
object KafKaWordCount {
def main(args: Array[String]) {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("KafKaWordCount")
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint("checkpoint")
val lines = ssc.socketTextStream("localhost", 9999) //using NETCAT for test
lines.map(x => (x, (x, 1)))
.reduceByKeyAndWindow((first: (String, Int), second: (String, Int)) =>
(first._1, first._2 + second._2), Seconds(60), Seconds(60), 2)
.updateStateByKey(updateSeqCount _)
.print(1)
ssc.start()
ssc.awaitTermination()
}
def updateSeqCount(values: Seq[(String, Int)],
state: Option[(String, Int]): Option[(String, Int)] = {
if (values.isEmpty) state
else {
val (word, count) = values.head
if (count == 10) {
println(s"Key: $word reached count $count!")
None
}
else Some((word, count))
}
}
}https://stackoverflow.com/questions/36229584
复制相似问题