考虑到一个庞大的tweet数据集,我需要:
所以,我想到的第一件事就是做这样的事:
val tweets = sparkContext.textFile(DATASET).cache
val hashtags = tweets
.map(extractHashTags)
.map((_, 1))
.reduceByKey(_ + _)
val emoticonsEmojis = tweets
.map(extractEmoticonsEmojis)
.map((_, 1))
.reduceByKey(_ + _)
val lemmas = tweets
.map(extractLemmas)
.map((_, 1))
.reduceByKey(_ + _) 但以这种方式,每条推文被处理了3次,对吗?如果是这样的话,是否有一种有效的方法来单独计算所有这些元素,只处理每条推文一次?
我在想这样的事情:
sparkContext.textFile(DATASET)
.map(extractor) // RDD[(List[String], List[String], List[String])]但这样它就变成了一场噩梦。另外,因为一旦我数了单词(我指的是请求的第三点),我将需要与另一个RDD连接,而在第一个版本中,这是非常简单的,而在第二个版本中则不是。
发布于 2018-06-15 18:13:05
使用Dataset API:
val tweets = sparkContext.textFile(DATASET)
val tokens = tweets.flatMap(extractor) //return RDD[(String, String)]
.toDF("type", "token") //type is one of ("hashtag", "emoticon", "lemma")
.groupBy("type", "token")
.count() //Dataset[Row] which has columns ("type", "token", "count")
val lemmas = tokens
.where($"type" === lit("lemma"))
.select("token", "count")
.as[(String, Long)]
.rdd //should be the same type as your original 'lemmas', for future join发布于 2018-06-15 18:15:08
也许是这样的?
sealed trait TokenType { }
object Hashtag extends TokenType
object Emoji extends TokenType
object Word extends TokenType
def extractTokens(tweet: String): Seq[(TokenType, String)] = {
...
}
val tokenCounts = tweets
.flatMap(extractTokens)
.map((_, 1))
.reduceByKey(_ + _)
val hashtagCounts = tokenCounts.collect { case ((Hashtag, x), count) => (x, count) }
// similar for emojis and wordshttps://stackoverflow.com/questions/50880170
复制相似问题