这个问题是State management not serializable的后续问题.
我想封装状态管理逻辑。
以下是我现在所处的位置:
class StateManager(
stream: DStream[(String, String)],
updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)]
) {
lazy val myState = stream.mapWithState(stateSpec).map(_.get)
lazy val stateSpec = StateSpec.function(updateStateFunction)
}
object StateManager {
def apply(
_dStream: DStream[(String, String)],
_updateState: (String, Option[String], State[String]) => Option[(String, String)]
) =
new StateManager(dStream, updateState)
}这很好,但只允许处理DStream[(String,String)],这是迈向通用状态管理的第一步,适合欢迎任何DStream:从DStream[(Int,String)]到DStream[(String,myCustomClass)]。
myState需要是一个值函数才能工作(serialization)。
但是我面临一个问题,因为type parameter不适用于scala中的函数对象。
user6910411通过使用ClassTag和一个封闭方法(Type-parameterize a DStream)给了我一个提示,但反过来它仍然是一种方法。
有没有人能得到一些关于如何克服这些困难的情报?
背景:
火花1.6
星火图:
object Consumer_Orchestrator {
def main(args: Array[String]) = {
//setup configurations
val streamingContext = StreamingEnvironment(/*configurations*/)
val kafkaStream = streamingContext.stream()
val updateStateFunction: (String, Option[String], State[String]) => Option[(String, String)] = (key, value, state) => {/*some code*/}
val initialState = emptyRDD
val stateManager = StateManager(kafkaStream, updateState)
val state: DStream[(String, String)] = stateManager.myState
state.foreachRDD(_.foreach(println))
myStreamingContext.start()
myStreamingContext.awaitTermination()
}
}创建StreamingEnvironment类的Streaming
class StreamingEnvironment(sparkConf: SparkConf, kafkaConf: KafkaConf) {
val sparkContext = spark.SparkContext.getOrCreate(sparkConf)
lazy val streamingContext = new StreamingContext(sparkContext, Seconds(30))
mStreamingContext.checkpoint(/*directory checkpoint*/)
mStreamingContext.remember(Minutes(1))
def stream() = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, myKafkaConf.mBrokers, myKafkaConf.mTopics)
def stop() = sparkContext.stop()
}
object StreamingEnvironment {
def apply(kafkaConf: KafkaConf) = {
val sparkConf = new SparkConf
new StreamingEnvironment(sparkConf, kafkaConf)
}
}发布于 2017-01-06 09:37:07
给你:
App.scala:
导入org.apache.spark.streaming.dstream.ConstantInputDStream org.apache.spark.{SparkContext,SparkConf}导入org.apache.spark.streaming._导入statemanager._对象应用程序{ def (args: ArrayString):Unit ={ val =新SparkContext("local*",“泛型”,新SparkConf()) val ssc =新StreamingContext( sc,秒(10)) ssc.checkpoint("/tmp/chk") StateManager(新ConstantInputDStream,sc.parallelize(Seq(A),1),),(_:String,_:OptionInt,_:StateInt) =>选项(1) ).myState.print ssc.start() ssc.awaitTermination() }StateManage.scala:
包状态器导入scala.reflect.ClassTag导入org.apache.spark.streaming.{State,StateSpec}导入org.apache.spark.streaming.dstream.DStream类StateManagerT : ClassTag, U : ClassTag, V : ClassTag, W : ClassTag {惰性val myState = stream.mapWithState( stateSpec ).map(_.get)惰性val stateSpec= StateSpec.function(updateStateFunction) } object StateManager { def applyT : ClassTag, U : ClassTag, V : ClassTag, W : ClassTag =新statemanager (StateManager,StateManager)}build.sbt:
scalaVersion := "2.11.8“val sparkVersion = "2.1.0”libraryDependencies ++= Seq( "org.apache.spark“%”火花-核心“% sparkVersion,"org.apache.spark”%“火花流”% sparkVersion )你可以看到这里没有魔法。如果引入泛型参数,则需要在相同的上下文中使用ClassTags。
https://stackoverflow.com/questions/41501895
复制相似问题