是否有一种方法可以计算RDD的每行出现的单词数,而不是使用map和reduce来计算完整的RDD?
例如,如果RDDString包含这两行:
我们找点乐子吧。 为了好玩你不需要任何计划。
然后,输出应该类似于包含键值对的映射:
(“让我们”,1) (“有”,1) (“一些”,1) (“乐趣”,1) ("To",1) (“有”,1) (“有趣”,1) (“你”,1) (“不”,1) (“需要”,1) (“计划”,1)
发布于 2017-05-16 07:32:24
根据我的理解,你可以做以下事情
你说你有RDD[String]数据
val data = Seq("Let's have some fun.",
"To have fun you don't need any plans.")
val rddData = sparkContext.parallelize(data)您可以将flatMap应用于split lines并在map函数中创建(word, 1) tuples。
val output = rddData.flatMap(_.split(" ")).map(word => (word, 1))这将为您提供所需的输出。
output.foreach(println)若要按行发生,请执行以下操作
val output = rddData.map(_.split(" ").map((_, 1)).groupBy(_._1)
.map { case (group: String, traversable) => traversable.reduce{(a,b) => (a._1, a._2 + b._2)} }.toList).flatMap(tuple => tuple)发布于 2017-05-16 14:25:11
请不要使用RDD,如果您刚刚开始使用Spark,而没有人告诉您使用它。有那么多更好、更高效的Spark来完成这一任务,以及在Spark中的大型数据集上进行许多其他分布式计算。
使用RDD就像将汇编程序用于一些您可以使用Scala (或其他高级编程语言)的东西。当然,当您开始进入火花的旅程时,我会亲自推荐Spark的高级API和DataFrames和数据集。
鉴于投入:
$ cat input.txt
Let's have some fun.
To have fun you don't need any plans.如果要使用Dataset API,则可以执行以下操作:
val lines = spark.read.text("input.txt").withColumnRenamed("value", "line")
val wordsPerLine = lines.withColumn("words", explode(split($"line", "\\s+")))
scala> wordsPerLine.show(false)
+-------------------------------------+------+
|line |words |
+-------------------------------------+------+
|Let's have some fun. |Let's |
|Let's have some fun. |have |
|Let's have some fun. |some |
|Let's have some fun. |fun. |
| | |
|To have fun you don't need any plans.|To |
|To have fun you don't need any plans.|have |
|To have fun you don't need any plans.|fun |
|To have fun you don't need any plans.|you |
|To have fun you don't need any plans.|don't |
|To have fun you don't need any plans.|need |
|To have fun you don't need any plans.|any |
|To have fun you don't need any plans.|plans.|
+-------------------------------------+------+
scala> wordsPerLine.
groupBy("line", "words").
count.
withColumn("word_count", struct($"words", $"count")).
select("line", "word_count").
groupBy("line").
agg(collect_set("word_count")).
show(truncate = false)
+-------------------------------------+------------------------------------------------------------------------------+
|line |collect_set(word_count) |
+-------------------------------------+------------------------------------------------------------------------------+
|To have fun you don't need any plans.|[[fun,1], [you,1], [don't,1], [have,1], [plans.,1], [any,1], [need,1], [To,1]]|
|Let's have some fun. |[[have,1], [fun.,1], [Let's,1], [some,1]] |
| |[[,1]] |
+-------------------------------------+------------------------------------------------------------------------------+好了。很简单,不是吗?
请参阅函数对象(关于explode和struct函数)。
发布于 2017-05-16 07:32:13
你想要的是把一条线转换成一个地图(单词,计数)。因此,您可以逐行定义函数计数:
def wordsCount(line: String):Map[String,Int] = {
line.split(" ").map(v => (v,1)).groupBy(_._1).mapValues(_.size)
}然后将其应用于您的RDDString
val lines:RDD[String] = ...
val wordsByLineRDD:RDD[Map[String,Int]] = lines.map(wordsCount)
// this should give you a Map per line with count of each word
wordsByLineRDD.take(2)
// Something like
// Array(Map(some -> 1, have -> 1, Let's -> 1, fun. -> 1), Map(any -> 1, have -> 1, don't -> 1, you -> 1, need -> 1, fun -> 1, To -> 1, plans. -> 1))https://stackoverflow.com/questions/43994499
复制相似问题