关于我的火花流程序流程的小问题。
我的职能是:
def parse(msg: String): Seq[String]它实际上将一个“好”消息拆分成多个字符串,如果字符串是“坏的”,则返回一个空的Seq。
我正在阅读卡夫卡主题中的消息,我希望将解析结果发送到两个不同的主题:如果消息是“好的”,则将解析结果发送到主题"good_msg_topic“(如果消息”不好“),将”坏“消息发送到"bad_msg_topic”主题。
为了达到这个目标,我做到了:
stream.foreachRDD(rdd => {
val res = rdd.map(msg => msg.value() -> parse(msg.value()))
res.foreach(pair => {
if (pair._2.isEmpty) {
producer.send(junkTopic, pair._1)
} else {
pair._2.foreach(m => producer.send(splitTopic, m))
}
})
})不过,我觉得这并不理想。使用Map对象将原始消息与结果关联起来可能会减慢进程.
我从Spark和Scala开始,所以我认为可以做得更好。
知道我能怎么改进吗?如果您认为更好的话,也可以更改解析函数的签名。
谢谢
发布于 2017-07-07 19:51:00
如果您还没有对性能进行度量,并且发现了瓶颈,我就不会太关心性能了。
我能想到的一件事是使用ADT来描述结果类型,这可能使这段代码更清晰:
sealed trait Result
case class GoodResult(seq: Seq[String]) extends Result
case class BadResult(original: String) extends Result让parse返回Result
def parse(s: String): Result然后在DStream上使用DStream而不是RDD
stream
.map(msg => parse(msg.value())
.foreachRDD { rdd =>
rdd.foreach { result =>
result match {
case GoodResult(seq) => seq.foreach(value => producer.send(splitTopic, value))
case BadResult(original) => producer.send(junkTopic, original)
}
}
}https://stackoverflow.com/questions/44978297
复制相似问题