首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >重启ReduceByKeyAndWindows

重启ReduceByKeyAndWindows
EN

Stack Overflow用户
提问于 2016-03-25 23:15:07
回答 1查看 254关注 0票数 1

我对Spark和Scala非常陌生,我正在使用ReduceByKeyAndWindows来计算kafka消息中的单词,因为我需要使用窗口功能。

我的应用程序的目的是在检测到卡夫卡在特定时间包含特定单词的"x“次消息时发出警报。然后,从开始重新启动。

下面的代码检测到了单词,但我无法使我的应用程序重新启动。我在想,如果可能的话,重新启动ReduceByKeyAndWindows的积累或其他方法来做到这一点。

代码语言:javascript
复制
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

object KafKaWordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local[2]").setAppName("KafKaWordCount")
    val ssc = new StreamingContext(conf, Seconds(2))

    ssc.checkpoint("checkpoint")

    val lines = ssc.socketTextStream("localhost", 9999) //using NETCAT for test 
    val wordCounts = 
        lines.map(x => (x, 1))
             .reduceByKeyAndWindow(_ + _, _ - _, Seconds(60), Seconds(2), 2) 

        //if the value from the key (word) exceeds 10 , sent alert and Restart the values
        wordCounts.print()
        ssc.start()
        ssc.awaitTermination()
    }
}

使用来自Yuval 的第二个例子,将reach从10减少到3,并发送7条消息。

第二个asnwer的输出是

代码语言:javascript
复制
Word: hello reached count: 1
Word: hello reached count: 2
//No print this message, its OK but the next word not start with 1
Word: hello reached count: 4
Word: hello reached count: 5
Word: hello reached count: 6
Word: hello reached count: 7

我期望的输出

代码语言:javascript
复制
Word: hello reached count: 1
Word: hello reached count: 2

Word: hello reached count: 1
Word: hello reached count: 2

Word: hello reached count: 1
EN

回答 1

Stack Overflow用户

发布于 2016-03-26 18:19:04

如果您使用的是Spark1.6.0及以上版本,则可以使用实验性DStream.mapWithState来保持单词计数的更新状态。一旦达到极限,您就可以删除状态并将其释放到管道中,并使用DStream.foreach打印出来。

代码语言:javascript
复制
object KafKaWordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf()
    .setMaster("local[2]")
    .setAppName("KafKaWordCount")

    val ssc = new StreamingContext(conf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val lines = ssc.socketTextStream("localhost", 9999) //using NETCAT for test
    val stateSpec = StateSpec.function(updateWordCount _)

    lines.map(x => (x, 1))
      .reduceByKeyAndWindow(_ + _, _ - _, Seconds(60), Seconds(2), 2)
      .mapWithState(stateSpec)
      .filter(_.isDefined)
      .foreachRDD(rdd => 
                  rdd.foreach { case (word, count) => 
                    println(s"Word: $word reached count: $count") })
    ssc.start()
    ssc.awaitTermination()
  }

  def updateWordCount(key: String, 
                      value: Option[Int], 
                      state: State[(String, Int)]): Option[(String, Int)] = {
    def updateCountState(count: Int): Option[(String, Int)] = {
      if (count == 10) {
        if (state.exists()) state.remove()
        Some((key, count))
      }
      else {
        state.update((key, count))
        None
      }
    }

    value match {
      case Some(count) => updateCountState(count)
      case _ => None
    }
  }
}

如果没有,则可以使用较慢的DStream.updateStateByKey

代码语言:javascript
复制
object KafKaWordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf()
    .setMaster("local[2]")
    .setAppName("KafKaWordCount")
    val ssc = new StreamingContext(conf, Seconds(2))

    ssc.checkpoint("checkpoint")

    val lines = ssc.socketTextStream("localhost", 9999) //using NETCAT for test

    lines.map(x => (x, (x, 1)))
         .reduceByKeyAndWindow((first: (String, Int), second: (String, Int)) =>
                               (first._1, first._2 + second._2), Seconds(60), Seconds(60), 2)
         .updateStateByKey(updateSeqCount _)
         .print(1)

    ssc.start()
    ssc.awaitTermination()
  }

  def updateSeqCount(values: Seq[(String, Int)], 
                     state: Option[(String, Int]): Option[(String, Int)] = {
        if (values.isEmpty) state
        else {
          val (word, count) = values.head  
          if (count == 10) {
            println(s"Key: $word reached count $count!")
            None 
          }
          else Some((word, count))
        }
    }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/36229584

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档