首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >通用状态管理

通用状态管理
EN

Stack Overflow用户
提问于 2017-01-06 08:40:40
回答 1查看 127关注 0票数 1

这个问题是State management not serializable的后续问题.

我想封装状态管理逻辑。

以下是我现在所处的位置:

代码语言:javascript
复制
class StateManager(
  stream: DStream[(String, String)],
  updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)]
) {
  lazy val myState = stream.mapWithState(stateSpec).map(_.get)
  lazy val stateSpec = StateSpec.function(updateStateFunction)
}

object StateManager {
  def apply(
    _dStream: DStream[(String, String)],
    _updateState: (String, Option[String], State[String]) => Option[(String, String)]
  ) =
    new StateManager(dStream, updateState)
}

这很好,但只允许处理DStream[(String,String)],这是迈向通用状态管理的第一步,适合欢迎任何DStream:从DStream[(Int,String)]DStream[(String,myCustomClass)]

myState需要是一个值函数才能工作(serialization)。

但是我面临一个问题,因为type parameter不适用于scala中的函数对象。

user6910411通过使用ClassTag和一个封闭方法(Type-parameterize a DStream)给了我一个提示,但反过来它仍然是一种方法。

有没有人能得到一些关于如何克服这些困难的情报?

背景:

火花1.6

星火图:

代码语言:javascript
复制
object Consumer_Orchestrator {
    def main(args: Array[String]) = {
        //setup configurations

        val streamingContext = StreamingEnvironment(/*configurations*/)

        val kafkaStream = streamingContext.stream()

        val updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)] = (key, value, state) => {/*some code*/}
        val initialState = emptyRDD

        val stateManager = StateManager(kafkaStream, updateState)
        val state: DStream[(String, String)] = stateManager.myState

        state.foreachRDD(_.foreach(println))

        myStreamingContext.start()
        myStreamingContext.awaitTermination()
    }
}

创建StreamingEnvironment类的Streaming

代码语言:javascript
复制
class StreamingEnvironment(sparkConf: SparkConf, kafkaConf: KafkaConf) {
    val sparkContext = spark.SparkContext.getOrCreate(sparkConf)
    lazy val streamingContext = new StreamingContext(sparkContext, Seconds(30))

    mStreamingContext.checkpoint(/*directory checkpoint*/)
    mStreamingContext.remember(Minutes(1))

    def stream() = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, myKafkaConf.mBrokers, myKafkaConf.mTopics)
    def stop() = sparkContext.stop()
}

object StreamingEnvironment {
    def apply(kafkaConf: KafkaConf) = {
    val sparkConf = new SparkConf

    new StreamingEnvironment(sparkConf, kafkaConf)
    }
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-01-06 09:37:07

给你:

  • App.scala: 导入org.apache.spark.streaming.dstream.ConstantInputDStream org.apache.spark.{SparkContext,SparkConf}导入org.apache.spark.streaming._导入statemanager._对象应用程序{ def (args: ArrayString):Unit ={ val =新SparkContext("local*",“泛型”,新SparkConf()) val ssc =新StreamingContext( sc,秒(10)) ssc.checkpoint("/tmp/chk") StateManager(新ConstantInputDStream,sc.parallelize(Seq(A),1),),(_:String,_:OptionInt,_:StateInt) =>选项(1) ).myState.print ssc.start() ssc.awaitTermination() }
  • StateManage.scala: 包状态器导入scala.reflect.ClassTag导入org.apache.spark.streaming.{State,StateSpec}导入org.apache.spark.streaming.dstream.DStream类StateManagerT : ClassTag, U : ClassTag, V : ClassTag, W : ClassTag {惰性val myState = stream.mapWithState( stateSpec ).map(_.get)惰性val stateSpec= StateSpec.function(updateStateFunction) } object StateManager { def applyT : ClassTag, U : ClassTag, V : ClassTag, W : ClassTag =新statemanager (StateManager,StateManager)}
  • build.sbt: scalaVersion := "2.11.8“val sparkVersion = "2.1.0”libraryDependencies ++= Seq( "org.apache.spark“%”火花-核心“% sparkVersion,"org.apache.spark”%“火花流”% sparkVersion )
  • 目录结构: App.scala build.sbt└──StateManage.scala
  • 示例执行: sbt运行.

你可以看到这里没有魔法。如果引入泛型参数,则需要在相同的上下文中使用ClassTags

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41501895

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档