首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >我的火花流程序的流程

我的火花流程序的流程
EN

Stack Overflow用户
提问于 2017-07-07 19:23:17
回答 1查看 87关注 0票数 1

关于我的火花流程序流程的小问题。

我的职能是:

代码语言:javascript
复制
def parse(msg: String): Seq[String]

它实际上将一个“好”消息拆分成多个字符串,如果字符串是“坏的”,则返回一个空的Seq。

我正在阅读卡夫卡主题中的消息,我希望将解析结果发送到两个不同的主题:如果消息是“好的”,则将解析结果发送到主题"good_msg_topic“(如果消息”不好“),将”坏“消息发送到"bad_msg_topic”主题。

为了达到这个目标,我做到了:

代码语言:javascript
复制
stream.foreachRDD(rdd => {
  val res = rdd.map(msg => msg.value() -> parse(msg.value()))

  res.foreach(pair => {
    if (pair._2.isEmpty) {
      producer.send(junkTopic, pair._1)
    } else {
      pair._2.foreach(m => producer.send(splitTopic, m))
    }
  })
})

不过,我觉得这并不理想。使用Map对象将原始消息与结果关联起来可能会减慢进程.

我从Spark和Scala开始,所以我认为可以做得更好。

知道我能怎么改进吗?如果您认为更好的话,也可以更改解析函数的签名。

谢谢

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-07-07 19:51:00

如果您还没有对性能进行度量,并且发现了瓶颈,我就不会太关心性能了。

我能想到的一件事是使用ADT来描述结果类型,这可能使这段代码更清晰:

代码语言:javascript
复制
sealed trait Result
case class GoodResult(seq: Seq[String]) extends Result
case class BadResult(original: String) extends Result

parse返回Result

代码语言:javascript
复制
def parse(s: String): Result

然后在DStream上使用DStream而不是RDD

代码语言:javascript
复制
stream
.map(msg => parse(msg.value())
.foreachRDD { rdd => 
   rdd.foreach { result => 
     result match {
       case GoodResult(seq) => seq.foreach(value => producer.send(splitTopic, value))
       case BadResult(original) => producer.send(junkTopic, original)
     }
   }
}
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/44978297

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档